diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessage.java new file mode 100644 index 000000000000..dc36c7aeccc0 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.value.AutoValue; +import com.google.pubsub.v1.PubsubMessage; +import org.apache.beam.sdk.coders.DefaultCoder; + +/** A message received from Pubsub. */ +@AutoValue +@DefaultCoder(IncomingMessageCoder.class) +abstract class IncomingMessage { + + /** Underlying Message. */ + public abstract com.google.pubsub.v1.PubsubMessage message(); + + /** + * Timestamp for element (ms since epoch). Either Pubsub's processing time, or the custom + * timestamp associated with the message. + */ + public abstract long timestampMsSinceEpoch(); + + /** Timestamp (in system time) at which we requested the message (ms since epoch). */ + public abstract long requestTimeMsSinceEpoch(); + + /** Id to pass back to Pubsub to acknowledge receipt of this message. */ + public abstract String ackId(); + + /** Id to pass to the runner to distinguish this message from all others. */ + public abstract String recordId(); + + public static IncomingMessage of( + PubsubMessage message, + long timestampMsSinceEpoch, + long requestTimeMsSinceEpoch, + String ackId, + String recordId) { + return new AutoValue_IncomingMessage( + message, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessageCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessageCoder.java new file mode 100644 index 000000000000..a4c64c091c9a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessageCoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.pubsub.v1.PubsubMessage; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A coder for IncomingMessage. */ +public class IncomingMessageCoder extends CustomCoder { + public static CoderProvider getCoderProvider() { + return CoderProviders.forCoder( + TypeDescriptor.of(IncomingMessage.class), new IncomingMessageCoder()); + } + + public static IncomingMessageCoder of() { + return new IncomingMessageCoder(); + } + + @Override + public void encode(IncomingMessage value, OutputStream outStream) throws IOException { + ProtoCoder.of(PubsubMessage.class).encode(value.message(), outStream); + VarLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream); + VarLongCoder.of().encode(value.requestTimeMsSinceEpoch(), outStream); + StringUtf8Coder.of().encode(value.ackId(), outStream); + StringUtf8Coder.of().encode(value.recordId(), outStream); + } + + @Override + public IncomingMessage decode(InputStream inStream) throws IOException { + return IncomingMessage.of( + ProtoCoder.of(PubsubMessage.class).decode(inStream), + VarLongCoder.of().decode(inStream), + VarLongCoder.of().decode(inStream), + StringUtf8Coder.of().decode(inStream), + StringUtf8Coder.of().decode(inStream)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessage.java new file mode 100644 index 000000000000..7d855f85ec42 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessage.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.ByteString; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.DefaultCoder; + +/** A message to be sent to Pubsub. */ +@AutoValue +@DefaultCoder(OutgoingMessageCoder.class) +abstract class OutgoingMessage { + + /** Underlying Message. May not have publish timestamp set. */ + public abstract com.google.pubsub.v1.PubsubMessage message(); + + /** Timestamp for element (ms since epoch). */ + public abstract long timestampMsSinceEpoch(); + + /** + * If using an id attribute, the record id to associate with this record's metadata so the + * receiver can reject duplicates. Otherwise {@literal null}. + */ + @Nullable + public abstract String recordId(); + + public static OutgoingMessage of( + com.google.pubsub.v1.PubsubMessage message, + long timestampMsSinceEpoch, + @Nullable String recordId) { + return new AutoValue_OutgoingMessage(message, timestampMsSinceEpoch, recordId); + } + + public static OutgoingMessage of( + PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) { + com.google.pubsub.v1.PubsubMessage.Builder builder = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(message.getPayload())); + if (message.getAttributeMap() != null) { + builder.putAllAttributes(message.getAttributeMap()); + } + return of(builder.build(), timestampMsSinceEpoch, recordId); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessageCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessageCoder.java new file mode 100644 index 000000000000..2436324529bd --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessageCoder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.pubsub.v1.PubsubMessage; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A coder for OutgoingMessage. */ +public class OutgoingMessageCoder extends CustomCoder { + public static CoderProvider getCoderProvider() { + return CoderProviders.forCoder( + TypeDescriptor.of(OutgoingMessage.class), new OutgoingMessageCoder()); + } + + public static OutgoingMessageCoder of() { + return new OutgoingMessageCoder(); + } + + @Override + public void encode(OutgoingMessage value, OutputStream outStream) throws IOException { + ProtoCoder.of(PubsubMessage.class).encode(value.message(), outStream); + VarLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream); + NullableCoder.of(StringUtf8Coder.of()).encode(value.recordId(), outStream); + } + + @Override + public OutgoingMessage decode(InputStream inStream) throws IOException { + return OutgoingMessage.of( + ProtoCoder.of(PubsubMessage.class).decode(inStream), + VarLongCoder.of().decode(inStream), + NullableCoder.of(StringUtf8Coder.of()).decode(inStream)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java new file mode 100644 index 000000000000..f0ae47f585c2 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.naming.SizeLimitExceededException; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; + +/** Writer to Pubsub which batches messages from bounded collections. */ +class PubsubBoundedWriter extends DoFn { + /** + * Max batch byte size. Messages are base64 encoded which encodes each set of three bytes into + * four bytes. + */ + private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = ((10 * 1000 * 1000) / 4) * 3; + + private static final int MAX_PUBLISH_BATCH_SIZE = 100; + + private transient List output; + private transient PubsubClient pubsubClient; + private transient int currentOutputBytes; + + private final int maxPublishBatchByteSize; + private final int maxPublishBatchSize; + private final PubsubIO.Write write; + + private PubsubBoundedWriter(PubsubIO.Write write) { + Preconditions.checkNotNull(write.getTopicProvider()); + this.maxPublishBatchSize = + MoreObjects.firstNonNull(write.getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE); + this.maxPublishBatchByteSize = + MoreObjects.firstNonNull(write.getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT); + this.write = write; + } + + static PubsubBoundedWriter of(PubsubIO.Write write) { + return new PubsubBoundedWriter<>(write); + } + + @StartBundle + public void startBundle(StartBundleContext c) throws IOException { + this.output = new ArrayList<>(); + this.currentOutputBytes = 0; + + // NOTE: idAttribute is ignored. + this.pubsubClient = + write + .getPubsubClientFactory() + .newClient( + write.getTimestampAttribute(), + null, + c.getPipelineOptions().as(PubsubOptions.class)); + } + + @ProcessElement + public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException { + byte[] payload; + PubsubMessage message = write.getFormatFn().apply(c.element()); + payload = message.getPayload(); + Map attributes = message.getAttributeMap(); + + if (payload.length > maxPublishBatchByteSize) { + String msg = + String.format( + "Pub/Sub message size (%d) exceeded maximum batch size (%d)", + payload.length, maxPublishBatchByteSize); + throw new SizeLimitExceededException(msg); + } + + // Checking before adding the message stops us from violating the max bytes + if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize) + || (output.size() >= maxPublishBatchSize)) { + publish(); + } + + // NOTE: The record id is always null. + output.add( + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(payload)) + .putAllAttributes(attributes) + .build(), + c.timestamp().getMillis(), + null)); + currentOutputBytes += payload.length; + } + + @FinishBundle + public void finishBundle() throws IOException { + if (!output.isEmpty()) { + publish(); + } + output = null; + currentOutputBytes = 0; + pubsubClient.close(); + pubsubClient = null; + } + + private void publish() throws IOException { + PubsubTopic topic = write.getTopicProvider().get(); + int n = pubsubClient.publish(PubsubClient.topicPathFromPath(topic.asPath()), output); + checkState(n == output.size()); + output.clear(); + currentOutputBytes = 0; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.delegate(write); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index 6f0f54d8b21e..ab1fc744045e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -21,9 +21,6 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.DateTime; -import com.google.auto.value.AutoValue; -import com.google.protobuf.ByteString; -import com.google.pubsub.v1.PubsubMessage; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; @@ -294,84 +291,6 @@ public static TopicPath topicPathFromName(String projectId, String topicName) { return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName)); } - /** - * A message to be sent to Pubsub. - * - *

NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java - * serialization is never used for non-test clients. - */ - @AutoValue - public abstract static class OutgoingMessage implements Serializable { - - /** Underlying Message. May not have publish timestamp set. */ - public abstract PubsubMessage message(); - - /** Timestamp for element (ms since epoch). */ - public abstract long timestampMsSinceEpoch(); - - /** - * If using an id attribute, the record id to associate with this record's metadata so the - * receiver can reject duplicates. Otherwise {@literal null}. - */ - @Nullable - public abstract String recordId(); - - public static OutgoingMessage of( - PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) { - return new AutoValue_PubsubClient_OutgoingMessage(message, timestampMsSinceEpoch, recordId); - } - - public static OutgoingMessage of( - org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message, - long timestampMsSinceEpoch, - @Nullable String recordId) { - PubsubMessage.Builder builder = - PubsubMessage.newBuilder().setData(ByteString.copyFrom(message.getPayload())); - if (message.getAttributeMap() != null) { - builder.putAllAttributes(message.getAttributeMap()); - } - return of(builder.build(), timestampMsSinceEpoch, recordId); - } - } - - /** - * A message received from Pubsub. - * - *

NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java - * serialization is never used for non-test clients. - */ - @AutoValue - abstract static class IncomingMessage implements Serializable { - - /** Underlying Message. */ - public abstract PubsubMessage message(); - - /** - * Timestamp for element (ms since epoch). Either Pubsub's processing time, or the custom - * timestamp associated with the message. - */ - public abstract long timestampMsSinceEpoch(); - - /** Timestamp (in system time) at which we requested the message (ms since epoch). */ - public abstract long requestTimeMsSinceEpoch(); - - /** Id to pass back to Pubsub to acknowledge receipt of this message. */ - public abstract String ackId(); - - /** Id to pass to the runner to distinguish this message from all others. */ - public abstract String recordId(); - - public static IncomingMessage of( - PubsubMessage message, - long timestampMsSinceEpoch, - long requestTimeMsSinceEpoch, - String ackId, - String recordId) { - return new AutoValue_PubsubClient_IncomingMessage( - message, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId); - } - } - /** * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages published. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index ee36fd20a08f..3891e84f0770 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -17,22 +17,14 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; - import com.google.api.client.util.Clock; import com.google.auto.value.AutoValue; -import com.google.protobuf.ByteString; import com.google.protobuf.Message; -import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; -import javax.naming.SizeLimitExceededException; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; import org.apache.beam.sdk.PipelineRunner; @@ -43,7 +35,6 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.options.ValueProvider; @@ -52,7 +43,6 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.utils.AvroUtils; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -93,7 +83,7 @@ public class PubsubIO { private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class); /** Factory for creating pubsub client to manage transport. */ - private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY; + private static final PubsubClient.PubsubClientFactory FACTORY = PubsubGrpcClient.FACTORY; /** * Project IDs must contain 6-63 lowercase letters, digits, or dashes. IDs must start with a @@ -587,11 +577,11 @@ public abstract static class Read extends PTransform> @Nullable abstract ValueProvider getTopicProvider(); - abstract PubsubClient.PubsubClientFactory getPubsubClientFactory(); - @Nullable abstract ValueProvider getSubscriptionProvider(); + abstract PubsubClient.PubsubClientFactory getPubsubClientFactory(); + /** The name of the message attribute to read timestamps from. */ @Nullable abstract String getTimestampAttribute(); @@ -858,14 +848,6 @@ private PubsubIO() {} /** Implementation of write methods. */ @AutoValue public abstract static class Write extends PTransform, PDone> { - /** - * Max batch byte size. Messages are base64 encoded which encodes each set of three bytes into - * four bytes. - */ - private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = ((10 * 1000 * 1000) / 4) * 3; - - private static final int MAX_PUBLISH_BATCH_SIZE = 100; - @Nullable abstract ValueProvider getTopicProvider(); @@ -998,15 +980,6 @@ public Write withIdAttribute(String idAttribute) { return toBuilder().setIdAttribute(idAttribute).build(); } - /** - * Used to write a PubSub message together with PubSub attributes. The user-supplied format - * function translates the input type T to a PubsubMessage object, which is used by the sink to - * separately set the PubSub message's payload and attributes. - */ - private Write withFormatFn(SimpleFunction formatFn) { - return toBuilder().setFormatFn(formatFn).build(); - } - @Override public PDone expand(PCollection input) { if (getTopicProvider() == null) { @@ -1015,12 +988,8 @@ public PDone expand(PCollection input) { switch (input.isBounded()) { case BOUNDED: - input.apply( - ParDo.of( - new PubsubBoundedWriter( - MoreObjects.firstNonNull(getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE), - MoreObjects.firstNonNull( - getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT)))); + // NOTE: idAttribute is ignored. + input.apply(ParDo.of(PubsubBoundedWriter.of(this))); return PDone.in(input.getPipeline()); case UNBOUNDED: return input @@ -1047,101 +1016,6 @@ public void populateDisplayData(DisplayData.Builder builder) { populateCommonDisplayData( builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider()); } - - /** - * Writer to Pubsub which batches messages from bounded collections. - * - *

Public so can be suppressed by runners. - */ - public class PubsubBoundedWriter extends DoFn { - private transient List output; - private transient PubsubClient pubsubClient; - private transient int currentOutputBytes; - - private int maxPublishBatchByteSize; - private int maxPublishBatchSize; - - PubsubBoundedWriter(int maxPublishBatchSize, int maxPublishBatchByteSize) { - this.maxPublishBatchSize = maxPublishBatchSize; - this.maxPublishBatchByteSize = maxPublishBatchByteSize; - } - - PubsubBoundedWriter() { - this(MAX_PUBLISH_BATCH_SIZE, MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT); - } - - @StartBundle - public void startBundle(StartBundleContext c) throws IOException { - this.output = new ArrayList<>(); - this.currentOutputBytes = 0; - - // NOTE: idAttribute is ignored. - this.pubsubClient = - getPubsubClientFactory() - .newClient( - getTimestampAttribute(), null, c.getPipelineOptions().as(PubsubOptions.class)); - } - - @ProcessElement - public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException { - byte[] payload; - PubsubMessage message = getFormatFn().apply(c.element()); - payload = message.getPayload(); - Map attributes = message.getAttributeMap(); - - if (payload.length > maxPublishBatchByteSize) { - String msg = - String.format( - "Pub/Sub message size (%d) exceeded maximum batch size (%d)", - payload.length, maxPublishBatchByteSize); - throw new SizeLimitExceededException(msg); - } - - // Checking before adding the message stops us from violating the max bytes - if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize) - || (output.size() >= maxPublishBatchSize)) { - publish(); - } - - // NOTE: The record id is always null. - output.add( - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(payload)) - .putAllAttributes(attributes) - .build(), - c.timestamp().getMillis(), - null)); - currentOutputBytes += payload.length; - } - - @FinishBundle - public void finishBundle() throws IOException { - if (!output.isEmpty()) { - publish(); - } - output = null; - currentOutputBytes = 0; - pubsubClient.close(); - pubsubClient = null; - } - - private void publish() throws IOException { - PubsubTopic topic = getTopicProvider().get(); - int n = - pubsubClient.publish( - PubsubClient.topicPathFromName(topic.project, topic.topic), output); - checkState(n == output.size()); - output.clear(); - currentOutputBytes = 0; - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.delegate(Write.this); - } - } } private static SerializableFunction parsePayloadUsingCoder(Coder coder) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 8be8c56c731f..57da4c3bee40 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Counter; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 230161ca1433..e338eab94b54 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -384,7 +384,7 @@ static class PubsubReader extends UnboundedSource.UnboundedReader * Messages we have received from Pubsub and not yet delivered downstream. We preserve their * order. */ - private final Queue notYetRead; + private final Queue notYetRead; private static class InFlightState { /** When request which yielded message was issues. */ @@ -438,7 +438,7 @@ public InFlightState(long requestTimeMsSinceEpoch, long ackDeadlineMsSinceEpoch) private long lastWatermarkMsSinceEpoch; /** The current message, or {@literal null} if none. */ - @Nullable private PubsubClient.IncomingMessage current; + @Nullable private IncomingMessage current; /** Stats only: System time (ms since epoch) we last logs stats, or -1 if never. */ private long lastLogTimestampMsSinceEpoch; @@ -715,7 +715,7 @@ private void pull() throws IOException { // Pull the next batch. // BLOCKs until received. - Collection receivedMessages = + Collection receivedMessages = pubsubClient.get().pull(requestTimeMsSinceEpoch, subscription, PULL_BATCH_SIZE, true); if (receivedMessages.isEmpty()) { // Nothing available yet. Try again later. @@ -725,7 +725,7 @@ private void pull() throws IOException { lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch; // Capture the received messages. - for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) { + for (IncomingMessage incomingMessage : receivedMessages) { notYetRead.add(incomingMessage); notYetReadBytes += incomingMessage.message().getData().size(); inFlight.put( @@ -986,7 +986,7 @@ public PubsubCheckpoint getCheckpointMark() { // in which case we'll double ACK messages to Pubsub. However Pubsub is fine with that. List snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds); List snapshotNotYetReadIds = new ArrayList<>(notYetRead.size()); - for (PubsubClient.IncomingMessage incomingMessage : notYetRead) { + for (IncomingMessage incomingMessage : notYetRead) { snapshotNotYetReadIds.add(incomingMessage.ackId()); } if (outer.subscriptionPath == null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java index e1e8711587a6..9550b77fdd7d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java @@ -29,7 +29,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; @@ -190,7 +189,7 @@ private List listSubscriptions(ProjectPath projectPath, TopicP /** Publish messages to {@link #topicPath()}. */ public void publish(List messages) throws IOException { - List outgoingMessages = + List outgoingMessages = messages.stream().map(this::toOutgoingMessage).collect(toList()); pubsub.publish(eventsTopicPath, outgoingMessages); } @@ -202,8 +201,7 @@ public List pull() throws IOException { /** Pull up to {@code maxBatchSize} messages from {@link #subscriptionPath()}. */ public List pull(int maxBatchSize) throws IOException { - List messages = - pubsub.pull(0, subscriptionPath, maxBatchSize, true); + List messages = pubsub.pull(0, subscriptionPath, maxBatchSize, true); if (!messages.isEmpty()) { pubsub.acknowledge( subscriptionPath, @@ -298,8 +296,8 @@ public void checkIfAnySubscriptionExists(String project, Duration timeoutDuratio } } - private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage message) { - return PubsubClient.OutgoingMessage.of( + private OutgoingMessage toOutgoingMessage(PubsubMessage message) { + return OutgoingMessage.of( com.google.pubsub.v1.PubsubMessage.newBuilder() .setData(ByteString.copyFrom(message.getPayload())) .putAllAttributes(message.getAttributeMap()) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java index e6edc08b139d..0d00463829bb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java @@ -29,7 +29,6 @@ import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.state.BagState; @@ -247,7 +246,7 @@ public void waitForSuccess(Duration duration) throws IOException { private String pollForResultForDuration( SubscriptionPath signalSubscriptionPath, Duration duration) throws IOException { - List signal = null; + List signal = null; DateTime endPolling = DateTime.now().plus(duration.getMillis()); do { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java index 4dd719b5de1d..988dc00bde66 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java @@ -41,8 +41,6 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index 0dc910f2c701..c1d2b695b6a4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -48,7 +48,6 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.AvroGeneratedUser; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read; import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index aad9729699a8..dd78566c819c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -41,8 +41,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java index 6b920e839c59..392316102dde 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java @@ -25,8 +25,6 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index f8cd86ee463c..818b97b9e762 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.RecordIdMethod; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java index 43ecbdc5821e..ea18a67a5182 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;