From 61f4a96401b050f7f46eef060838e6d3758e42fe Mon Sep 17 00:00:00 2001 From: dpcollins Date: Sat, 28 Dec 2019 00:41:05 -0500 Subject: [PATCH 1/6] Move external PubsubIO hooks outside of PubsubIO. Also fix these so that they typecheck correctly without relying on java runtime duck typing. --- .../beam/sdk/io/gcp/pubsub/ExternalRead.java | 138 ++++++++++++++ .../beam/sdk/io/gcp/pubsub/ExternalWrite.java | 102 +++++++++++ .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 172 +----------------- 3 files changed, 242 insertions(+), 170 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java new file mode 100644 index 000000000000..c8f8611f5892 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.service.AutoService; +import com.google.protobuf.ByteString; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubSubscription; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** Exposes {@link PubsubIO.Read} as an external transform for cross-language usage. */ +@Experimental +@AutoService(ExternalTransformRegistrar.class) +public final class ExternalRead implements ExternalTransformRegistrar { + public ExternalRead() {} + + public static final String URN = "beam:external:java:pubsub:read:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(URN, ReadBuilder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class Configuration { + @Nullable private String topic; + @Nullable private String subscription; + @Nullable private String idAttribute; + @Nullable private String timestampAttribute; + private boolean needsAttributes; + + public void setTopic(@Nullable String topic) { + this.topic = topic; + } + + public void setSubscription(@Nullable String subscription) { + this.subscription = subscription; + } + + public void setIdLabel(@Nullable String idAttribute) { + this.idAttribute = idAttribute; + } + + public void setTimestampAttribute(@Nullable String timestampAttribute) { + this.timestampAttribute = timestampAttribute; + } + + public void setWithAttributes(Boolean needsAttributes) { + // we must use Boolean instead of boolean because the external payload system + // inspects the native type of each coder urn, and BooleanCoder wants Boolean. + this.needsAttributes = needsAttributes; + } + } + + public static class ReadBuilder + implements ExternalTransformBuilder> { + public ReadBuilder() {} + + @Override + public PTransform> buildExternal(Configuration config) { + PubsubIO.Read.Builder readBuilder = new AutoValue_PubsubIO_Read.Builder<>(); + readBuilder.setCoder(ByteArrayCoder.of()); + if (config.topic != null) { + StaticValueProvider topic = StaticValueProvider.of(config.topic); + readBuilder.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)); + } + if (config.subscription != null) { + StaticValueProvider subscription = StaticValueProvider.of(config.subscription); + readBuilder.setSubscriptionProvider( + NestedValueProvider.of(subscription, PubsubSubscription::fromPath)); + } + if (config.idAttribute != null) { + readBuilder.setIdAttribute(config.idAttribute); + } + if (config.timestampAttribute != null) { + readBuilder.setTimestampAttribute(config.timestampAttribute); + } + readBuilder.setNeedsAttributes(config.needsAttributes); + if (config.needsAttributes) { + readBuilder.setParseFn(new ParsePayloadAsPubsubMessageProto()); + } else { + readBuilder.setParseFn( + new SimpleFunction() { + @Override + public byte[] apply(PubsubMessage input) { + return input.getPayload(); + } + }); + } + readBuilder.setNeedsMessageId(false); + return readBuilder.build(); + } + } + + // Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation. + private static class ParsePayloadAsPubsubMessageProto + extends SimpleFunction { + @Override + public byte[] apply(PubsubMessage input) { + Map attributes = input.getAttributeMap(); + com.google.pubsub.v1.PubsubMessage.Builder message = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(input.getPayload())); + // TODO(BEAM-8085) this should not be null + if (attributes != null) { + message.putAllAttributes(attributes); + } + return message.build().toByteArray(); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java new file mode 100644 index 000000000000..93ab52d8caac --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.service.AutoService; +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** Exposes {@link PubsubIO.Write} as an external transform for cross-language usage. */ +@Experimental +@AutoService(ExternalTransformRegistrar.class) +public final class ExternalWrite implements ExternalTransformRegistrar { + public ExternalWrite() {} + + public static final String URN = "beam:external:java:pubsub:write:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(URN, WriteBuilder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class Configuration { + private String topic; + @Nullable private String idAttribute; + @Nullable private String timestampAttribute; + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setIdLabel(@Nullable String idAttribute) { + this.idAttribute = idAttribute; + } + + public void setTimestampAttribute(@Nullable String timestampAttribute) { + this.timestampAttribute = timestampAttribute; + } + } + + public static class WriteBuilder + implements ExternalTransformBuilder, PDone> { + public WriteBuilder() {} + + @Override + public PTransform, PDone> buildExternal(Configuration config) { + PubsubIO.Write.Builder writeBuilder = new AutoValue_PubsubIO_Write.Builder<>(); + writeBuilder.setFormatFn(new FormatFn()); + if (config.topic != null) { + StaticValueProvider topic = StaticValueProvider.of(config.topic); + writeBuilder.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)); + } + if (config.idAttribute != null) { + writeBuilder.setIdAttribute(config.idAttribute); + } + if (config.timestampAttribute != null) { + writeBuilder.setTimestampAttribute(config.timestampAttribute); + } + return writeBuilder.build(); + } + } + + private static class FormatFn extends SimpleFunction { + @Override + public PubsubMessage apply(byte[] input) { + try { + com.google.pubsub.v1.PubsubMessage message = + com.google.pubsub.v1.PubsubMessage.parseFrom(input); + return new PubsubMessage(message.getData().toByteArray(), message.getAttributesMap()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Could not decode Pubsub message", e); + } + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 0adea6b38622..cd6bba0f814c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -20,10 +20,8 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.Clock; -import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -42,11 +40,9 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; @@ -58,7 +54,6 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -718,8 +713,7 @@ public abstract static class Read extends PTransform> abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder - implements ExternalTransformBuilder> { + abstract static class Builder { abstract Builder setTopicProvider(ValueProvider topic); abstract Builder setPubsubClientFactory(PubsubClient.PubsubClientFactory clientFactory); @@ -750,85 +744,6 @@ abstract static class Builder abstract Builder setClock(@Nullable Clock clock); abstract Read build(); - - @Override - public PTransform> buildExternal(External.Configuration config) { - if (config.topic != null) { - StaticValueProvider topic = StaticValueProvider.of(config.topic); - setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())); - } - if (config.subscription != null) { - StaticValueProvider subscription = StaticValueProvider.of(config.subscription); - setSubscriptionProvider( - NestedValueProvider.of(subscription, new SubscriptionTranslator())); - } - if (config.idAttribute != null) { - setIdAttribute(config.idAttribute); - } - if (config.timestampAttribute != null) { - setTimestampAttribute(config.timestampAttribute); - } - setPubsubClientFactory(FACTORY); - setNeedsAttributes(config.needsAttributes); - Coder coder = ByteArrayCoder.of(); - if (config.needsAttributes) { - SimpleFunction parseFn = - (SimpleFunction) new ParsePayloadAsPubsubMessageProto(); - setParseFn(parseFn); - setCoder(coder); - } else { - setParseFn(new ParsePayloadUsingCoder<>(coder)); - setCoder(coder); - } - setNeedsMessageId(false); - return build(); - } - } - - /** Exposes {@link PubSubIO.Read} as an external transform for cross-language usage. */ - @Experimental - @AutoService(ExternalTransformRegistrar.class) - public static class External implements ExternalTransformRegistrar { - - public static final String URN = "beam:external:java:pubsub:read:v1"; - - @Override - public Map> knownBuilders() { - return ImmutableMap.of(URN, AutoValue_PubsubIO_Read.Builder.class); - } - - /** Parameters class to expose the transform to an external SDK. */ - public static class Configuration { - - // All byte arrays are UTF-8 encoded strings - @Nullable private String topic; - @Nullable private String subscription; - @Nullable private String idAttribute; - @Nullable private String timestampAttribute; - private boolean needsAttributes; - - public void setTopic(@Nullable String topic) { - this.topic = topic; - } - - public void setSubscription(@Nullable String subscription) { - this.subscription = subscription; - } - - public void setIdLabel(@Nullable String idAttribute) { - this.idAttribute = idAttribute; - } - - public void setTimestampAttribute(@Nullable String timestampAttribute) { - this.timestampAttribute = timestampAttribute; - } - - public void setWithAttributes(Boolean needsAttributes) { - // we must use Boolean instead of boolean because the external payload system - // inspects the native type of each coder urn, and BooleanCoder wants Boolean. - this.needsAttributes = needsAttributes; - } - } } /** @@ -1051,8 +966,7 @@ public abstract static class Write extends PTransform, PDone> abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder - implements ExternalTransformBuilder, PDone> { + abstract static class Builder { abstract Builder setTopicProvider(ValueProvider topicProvider); abstract Builder setPubsubClientFactory(PubsubClient.PubsubClientFactory factory); @@ -1068,58 +982,6 @@ abstract static class Builder abstract Builder setFormatFn(SimpleFunction formatFn); abstract Write build(); - - @Override - public PTransform, PDone> buildExternal(External.Configuration config) { - if (config.topic != null) { - StaticValueProvider topic = StaticValueProvider.of(config.topic); - setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())); - } - if (config.idAttribute != null) { - setIdAttribute(config.idAttribute); - } - if (config.timestampAttribute != null) { - setTimestampAttribute(config.timestampAttribute); - } - SimpleFunction parseFn = - (SimpleFunction) new FormatPayloadFromPubsubMessageProto(); - setFormatFn(parseFn); - return build(); - } - } - - /** Exposes {@link PubSubIO.Write} as an external transform for cross-language usage. */ - @Experimental - @AutoService(ExternalTransformRegistrar.class) - public static class External implements ExternalTransformRegistrar { - - public static final String URN = "beam:external:java:pubsub:write:v1"; - - @Override - public Map> knownBuilders() { - return ImmutableMap.of(URN, AutoValue_PubsubIO_Write.Builder.class); - } - - /** Parameters class to expose the transform to an external SDK. */ - public static class Configuration { - - // All byte arrays are UTF-8 encoded strings - private String topic; - @Nullable private String idAttribute; - @Nullable private String timestampAttribute; - - public void setTopic(String topic) { - this.topic = topic; - } - - public void setIdLabel(@Nullable String idAttribute) { - this.idAttribute = idAttribute; - } - - public void setTimestampAttribute(@Nullable String timestampAttribute) { - this.timestampAttribute = timestampAttribute; - } - } } /** @@ -1369,22 +1231,6 @@ public T apply(PubsubMessage input) { } } - private static class ParsePayloadAsPubsubMessageProto - extends SimpleFunction { - @Override - public byte[] apply(PubsubMessage input) { - Map attributes = input.getAttributeMap(); - com.google.pubsub.v1.PubsubMessage.Builder message = - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(input.getPayload())); - // TODO(BEAM-8085) this should not be null - if (attributes != null) { - message.putAllAttributes(attributes); - } - return message.build().toByteArray(); - } - } - private static class FormatPayloadAsUtf8 extends SimpleFunction { @Override public PubsubMessage apply(String input) { @@ -1409,20 +1255,6 @@ public PubsubMessage apply(T input) { } } - private static class FormatPayloadFromPubsubMessageProto - extends SimpleFunction { - @Override - public PubsubMessage apply(byte[] input) { - try { - com.google.pubsub.v1.PubsubMessage message = - com.google.pubsub.v1.PubsubMessage.parseFrom(input); - return new PubsubMessage(message.getData().toByteArray(), message.getAttributesMap()); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException("Could not decode Pubsub message", e); - } - } - } - private static class IdentityMessageFn extends SimpleFunction { @Override public PubsubMessage apply(PubsubMessage input) { From 77b40c7989f8eec6202d2f623a6fcdae0d965407 Mon Sep 17 00:00:00 2001 From: git Date: Sat, 28 Dec 2019 02:28:10 -0500 Subject: [PATCH 2/6] Remove optionality and add sensible defaults to PubsubIO builders. This should have no user-facing effects. --- .../beam/sdk/io/gcp/pubsub/ExternalRead.java | 25 +- .../beam/sdk/io/gcp/pubsub/ExternalWrite.java | 7 +- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 259 ++++++------------ 3 files changed, 93 insertions(+), 198 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java index c8f8611f5892..e478b4546dae 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; @@ -85,7 +85,13 @@ public ReadBuilder() {} @Override public PTransform> buildExternal(Configuration config) { - PubsubIO.Read.Builder readBuilder = new AutoValue_PubsubIO_Read.Builder<>(); + PubsubIO.Read.Builder readBuilder; + if (config.needsAttributes) { + readBuilder = PubsubIO.Read.newBuilder(new ParsePayloadAsPubsubMessageProto()); + readBuilder.setNeedsAttributes(true); + } else { + readBuilder = PubsubIO.Read.newBuilder(PubsubMessage::getPayload); + } readBuilder.setCoder(ByteArrayCoder.of()); if (config.topic != null) { StaticValueProvider topic = StaticValueProvider.of(config.topic); @@ -102,26 +108,13 @@ public PTransform> buildExternal(Configuration confi if (config.timestampAttribute != null) { readBuilder.setTimestampAttribute(config.timestampAttribute); } - readBuilder.setNeedsAttributes(config.needsAttributes); - if (config.needsAttributes) { - readBuilder.setParseFn(new ParsePayloadAsPubsubMessageProto()); - } else { - readBuilder.setParseFn( - new SimpleFunction() { - @Override - public byte[] apply(PubsubMessage input) { - return input.getPayload(); - } - }); - } - readBuilder.setNeedsMessageId(false); return readBuilder.build(); } } // Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation. private static class ParsePayloadAsPubsubMessageProto - extends SimpleFunction { + implements SerializableFunction { @Override public byte[] apply(PubsubMessage input) { Map attributes = input.getAttributeMap(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java index 93ab52d8caac..813a01c64480 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; @@ -71,8 +71,7 @@ public WriteBuilder() {} @Override public PTransform, PDone> buildExternal(Configuration config) { - PubsubIO.Write.Builder writeBuilder = new AutoValue_PubsubIO_Write.Builder<>(); - writeBuilder.setFormatFn(new FormatFn()); + PubsubIO.Write.Builder writeBuilder = PubsubIO.Write.newBuilder(new FormatFn()); if (config.topic != null) { StaticValueProvider topic = StaticValueProvider.of(config.topic); writeBuilder.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)); @@ -87,7 +86,7 @@ public PTransform, PDone> buildExternal(Configuration config } } - private static class FormatFn extends SimpleFunction { + private static class FormatFn implements SerializableFunction { @Override public PubsubMessage apply(byte[] input) { try { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index cd6bba0f814c..ee36fd20a08f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -45,13 +44,13 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; @@ -283,16 +282,6 @@ public String toString() { } } - /** Used to build a {@link ValueProvider} for {@link PubsubSubscription}. */ - private static class SubscriptionTranslator - implements SerializableFunction { - - @Override - public PubsubSubscription apply(String from) { - return PubsubSubscription.fromPath(from); - } - } - /** Used to build a {@link ValueProvider} for {@link SubscriptionPath}. */ private static class SubscriptionPathTranslator implements SerializableFunction { @@ -303,15 +292,6 @@ public SubscriptionPath apply(PubsubSubscription from) { } } - /** Used to build a {@link ValueProvider} for {@link PubsubTopic}. */ - private static class TopicTranslator implements SerializableFunction { - - @Override - public PubsubTopic apply(String from) { - return PubsubTopic.fromPath(from); - } - } - /** Used to build a {@link ValueProvider} for {@link TopicPath}. */ private static class TopicPathTranslator implements SerializableFunction { @@ -321,16 +301,6 @@ public TopicPath apply(PubsubTopic from) { } } - /** Used to build a {@link ValueProvider} for {@link ProjectPath}. */ - private static class ProjectPathTranslator - implements SerializableFunction { - - @Override - public ProjectPath apply(PubsubSubscription from) { - return PubsubClient.projectPathFromId(from.project); - } - } - /** Class representing a Cloud Pub/Sub Topic. */ public static class PubsubTopic implements Serializable { @@ -439,28 +409,13 @@ public String toString() { } } - /** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */ - private static Read read() { - return new AutoValue_PubsubIO_Read.Builder() - .setNeedsAttributes(false) - .setNeedsMessageId(false) - .setPubsubClientFactory(FACTORY) - .build(); - } - /** * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The * messages will only contain a {@link PubsubMessage#getPayload() payload}, but no {@link * PubsubMessage#getAttributeMap() attributes}. */ public static Read readMessages() { - return new AutoValue_PubsubIO_Read.Builder() - .setPubsubClientFactory(FACTORY) - .setCoder(PubsubMessagePayloadOnlyCoder.of()) - .setParseFn(new IdentityMessageFn()) - .setNeedsAttributes(false) - .setNeedsMessageId(false) - .build(); + return Read.newBuilder().setCoder(PubsubMessagePayloadOnlyCoder.of()).build(); } /** @@ -470,11 +425,8 @@ public static Read readMessages() { * PubsubMessage#getAttributeMap() attributes}. */ public static Read readMessagesWithMessageId() { - return new AutoValue_PubsubIO_Read.Builder() - .setPubsubClientFactory(FACTORY) + return Read.newBuilder() .setCoder(PubsubMessageWithMessageIdCoder.of()) - .setParseFn(new IdentityMessageFn()) - .setNeedsAttributes(false) .setNeedsMessageId(true) .build(); } @@ -485,12 +437,9 @@ public static Read readMessagesWithMessageId() { * PubsubMessage#getAttributeMap() attributes}. */ public static Read readMessagesWithAttributes() { - return new AutoValue_PubsubIO_Read.Builder() - .setPubsubClientFactory(FACTORY) + return Read.newBuilder() .setCoder(PubsubMessageWithAttributesCoder.of()) - .setParseFn(new IdentityMessageFn()) .setNeedsAttributes(true) - .setNeedsMessageId(false) .build(); } @@ -501,10 +450,8 @@ public static Read readMessagesWithAttributes() { * messageId} from PubSub. */ public static Read readMessagesWithAttributesAndMessageId() { - return new AutoValue_PubsubIO_Read.Builder() - .setPubsubClientFactory(FACTORY) + return Read.newBuilder() .setCoder(PubsubMessageWithAttributesAndMessageIdCoder.of()) - .setParseFn(new IdentityMessageFn()) .setNeedsAttributes(true) .setNeedsMessageId(true) .build(); @@ -515,12 +462,9 @@ public static Read readMessagesWithAttributesAndMessageId() { * Pub/Sub stream. */ public static Read readStrings() { - return new AutoValue_PubsubIO_Read.Builder() - .setNeedsAttributes(false) - .setNeedsMessageId(false) - .setPubsubClientFactory(FACTORY) + return Read.newBuilder( + (PubsubMessage message) -> new String(message.getPayload(), StandardCharsets.UTF_8)) .setCoder(StringUtf8Coder.of()) - .setParseFn(new ParsePayloadAsUtf8()) .build(); } @@ -533,13 +477,7 @@ public static Read readProtos(Class messageClass) { // We should not be relying on the fact that ProtoCoder's wire format is identical to // the protobuf wire format, as the wire format is not part of a coder's API. ProtoCoder coder = ProtoCoder.of(messageClass); - return new AutoValue_PubsubIO_Read.Builder() - .setNeedsAttributes(false) - .setNeedsMessageId(false) - .setPubsubClientFactory(FACTORY) - .setCoder(coder) - .setParseFn(new ParsePayloadUsingCoder<>(coder)) - .build(); + return Read.newBuilder(parsePayloadUsingCoder(coder)).setCoder(coder).build(); } /** @@ -551,13 +489,7 @@ public static Read readAvros(Class clazz) { // We should not be relying on the fact that AvroCoder's wire format is identical to // the Avro wire format, as the wire format is not part of a coder's API. AvroCoder coder = AvroCoder.of(clazz); - return new AutoValue_PubsubIO_Read.Builder() - .setNeedsAttributes(false) - .setNeedsMessageId(false) - .setPubsubClientFactory(FACTORY) - .setCoder(coder) - .setParseFn(new ParsePayloadUsingCoder<>(coder)) - .build(); + return Read.newBuilder(parsePayloadUsingCoder(coder)).setCoder(coder).build(); } /** @@ -566,13 +498,7 @@ public static Read readAvros(Class clazz) { */ public static Read readMessagesWithCoderAndParseFn( Coder coder, SimpleFunction parseFn) { - return new AutoValue_PubsubIO_Read.Builder() - .setNeedsAttributes(false) - .setNeedsMessageId(false) - .setPubsubClientFactory(FACTORY) - .setCoder(coder) - .setParseFn(parseFn) - .build(); + return Read.newBuilder(parseFn).setCoder(coder).build(); } /** @@ -586,15 +512,13 @@ public static Read readMessagesWithCoderAndParseFn( public static Read readAvroGenericRecords(org.apache.avro.Schema avroSchema) { Schema schema = AvroUtils.getSchema(GenericRecord.class, avroSchema); AvroCoder coder = AvroCoder.of(GenericRecord.class, avroSchema); - return new AutoValue_PubsubIO_Read.Builder() - .setNeedsAttributes(false) - .setNeedsMessageId(false) - .setPubsubClientFactory(FACTORY) - .setBeamSchema(schema) - .setTypeDescriptor(TypeDescriptor.of(GenericRecord.class)) - .setToRowFn(AvroUtils.getToRowFunction(GenericRecord.class, avroSchema)) - .setFromRowFn(AvroUtils.getFromRowFunction(GenericRecord.class)) - .setParseFn(new ParsePayloadUsingCoder<>(coder)) + return Read.newBuilder(parsePayloadUsingCoder(coder)) + .setCoder( + SchemaCoder.of( + schema, + TypeDescriptor.of(GenericRecord.class), + AvroUtils.getToRowFunction(GenericRecord.class, avroSchema), + AvroUtils.getFromRowFunction(GenericRecord.class))) .build(); } @@ -613,26 +537,19 @@ public static Read readAvrosWithBeamSchema(Class clazz) { org.apache.avro.Schema avroSchema = ReflectData.get().getSchema(clazz); AvroCoder coder = AvroCoder.of(clazz); Schema schema = AvroUtils.getSchema(clazz, null); - return new AutoValue_PubsubIO_Read.Builder() - .setNeedsAttributes(false) - .setNeedsMessageId(false) - .setPubsubClientFactory(FACTORY) - .setBeamSchema(schema) - .setTypeDescriptor(TypeDescriptor.of(clazz)) - .setToRowFn(AvroUtils.getToRowFunction(clazz, avroSchema)) - .setFromRowFn(AvroUtils.getFromRowFunction(clazz)) - .setParseFn(new ParsePayloadUsingCoder<>(coder)) + return Read.newBuilder(parsePayloadUsingCoder(coder)) + .setCoder( + SchemaCoder.of( + schema, + TypeDescriptor.of(clazz), + AvroUtils.getToRowFunction(clazz, avroSchema), + AvroUtils.getFromRowFunction(clazz))) .build(); } - /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ - private static Write write() { - return new AutoValue_PubsubIO_Write.Builder().build(); - } - /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ public static Write writeMessages() { - return PubsubIO.write().withFormatFn(new IdentityMessageFn()); + return Write.newBuilder().build(); } /** @@ -640,7 +557,10 @@ public static Write writeMessages() { * stream. */ public static Write writeStrings() { - return PubsubIO.write().withFormatFn(new FormatPayloadAsUtf8()); + return Write.newBuilder( + (String string) -> + new PubsubMessage(string.getBytes(StandardCharsets.UTF_8), ImmutableMap.of())) + .build(); } /** @@ -649,8 +569,7 @@ public static Write writeStrings() { */ public static Write writeProtos(Class messageClass) { // TODO: Like in readProtos(), stop using ProtoCoder and instead format the payload directly. - return PubsubIO.write() - .withFormatFn(new FormatPayloadUsingCoder<>(ProtoCoder.of(messageClass))); + return Write.newBuilder(formatPayloadUsingCoder(ProtoCoder.of(messageClass))).build(); } /** @@ -659,16 +578,15 @@ public static Write writeProtos(Class messageClass) { */ public static Write writeAvros(Class clazz) { // TODO: Like in readAvros(), stop using AvroCoder and instead format the payload directly. - return PubsubIO.write().withFormatFn(new FormatPayloadUsingCoder<>(AvroCoder.of(clazz))); + return Write.newBuilder(formatPayloadUsingCoder(AvroCoder.of(clazz))).build(); } - /** Implementation of {@link #read}. */ + /** Implementation of read methods. */ @AutoValue public abstract static class Read extends PTransform> { @Nullable abstract ValueProvider getTopicProvider(); - @Nullable abstract PubsubClient.PubsubClientFactory getPubsubClientFactory(); @Nullable @@ -683,12 +601,11 @@ public abstract static class Read extends PTransform> abstract String getIdAttribute(); /** The coder used to decode each record. */ - @Nullable abstract Coder getCoder(); /** User function for parsing PubsubMessage object. */ @Nullable - abstract SimpleFunction getParseFn(); + abstract SerializableFunction getParseFn(); @Nullable @Experimental(Kind.SCHEMAS) @@ -712,6 +629,19 @@ public abstract static class Read extends PTransform> abstract Builder toBuilder(); + static Builder newBuilder(SerializableFunction parseFn) { + Builder builder = new AutoValue_PubsubIO_Read.Builder(); + builder.setParseFn(parseFn); + builder.setPubsubClientFactory(FACTORY); + builder.setNeedsAttributes(false); + builder.setNeedsMessageId(false); + return builder; + } + + static Builder newBuilder() { + return newBuilder(x -> x); + } + @AutoValue.Builder abstract static class Builder { abstract Builder setTopicProvider(ValueProvider topic); @@ -726,7 +656,7 @@ abstract static class Builder { abstract Builder setCoder(Coder coder); - abstract Builder setParseFn(SimpleFunction parseFn); + abstract Builder setParseFn(SerializableFunction parseFn); @Experimental(Kind.SCHEMAS) abstract Builder setBeamSchema(@Nullable Schema beamSchema); @@ -741,7 +671,7 @@ abstract static class Builder { abstract Builder setNeedsMessageId(boolean needsMessageId); - abstract Builder setClock(@Nullable Clock clock); + abstract Builder setClock(Clock clock); abstract Read build(); } @@ -767,7 +697,7 @@ public Read fromSubscription(ValueProvider subscription) { } return toBuilder() .setSubscriptionProvider( - NestedValueProvider.of(subscription, new SubscriptionTranslator())) + NestedValueProvider.of(subscription, PubsubSubscription::fromPath)) .build(); } @@ -793,7 +723,7 @@ public Read fromTopic(ValueProvider topic) { PubsubTopic.fromPath(topic.get()); } return toBuilder() - .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())) + .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)) .build(); } @@ -896,7 +826,7 @@ public PCollection expand(PBegin input) { PubsubUnboundedSource source = new PubsubUnboundedSource( getClock(), - Optional.ofNullable(getPubsubClientFactory()).orElse(FACTORY), + getPubsubClientFactory(), null /* always get project from runtime PipelineOptions */, topicPath, subscriptionPath, @@ -904,10 +834,9 @@ public PCollection expand(PBegin input) { getIdAttribute(), getNeedsAttributes(), getNeedsMessageId()); - PCollection read = input.apply(source).apply(MapElements.via(getParseFn())); - return (getBeamSchema() != null) - ? read.setSchema(getBeamSchema(), getTypeDescriptor(), getToRowFn(), getFromRowFn()) - : read.setCoder(getCoder()); + PCollection read = + input.apply(source).apply(MapElements.into(new TypeDescriptor() {}).via(getParseFn())); + return read.setCoder(getCoder()); } @Override @@ -926,7 +855,7 @@ public void populateDisplayData(DisplayData.Builder builder) { /** Disallow construction of utility class. */ private PubsubIO() {} - /** Implementation of {@link #write}. */ + /** Implementation of write methods. */ @AutoValue public abstract static class Write extends PTransform, PDone> { /** @@ -940,7 +869,6 @@ public abstract static class Write extends PTransform, PDone> @Nullable abstract ValueProvider getTopicProvider(); - @Nullable abstract PubsubClient.PubsubClientFactory getPubsubClientFactory(); /** the batch size for bulk submissions to pubsub. */ @@ -960,11 +888,21 @@ public abstract static class Write extends PTransform, PDone> abstract String getIdAttribute(); /** The format function for input PubsubMessage objects. */ - @Nullable - abstract SimpleFunction getFormatFn(); + abstract SerializableFunction getFormatFn(); abstract Builder toBuilder(); + static Builder newBuilder(SerializableFunction formatFn) { + Builder builder = new AutoValue_PubsubIO_Write.Builder(); + builder.setPubsubClientFactory(FACTORY); + builder.setFormatFn(formatFn); + return builder; + } + + static Builder newBuilder() { + return newBuilder(x -> x); + } + @AutoValue.Builder abstract static class Builder { abstract Builder setTopicProvider(ValueProvider topicProvider); @@ -979,7 +917,7 @@ abstract static class Builder { abstract Builder setIdAttribute(String idAttribute); - abstract Builder setFormatFn(SimpleFunction formatFn); + abstract Builder setFormatFn(SerializableFunction formatFn); abstract Write build(); } @@ -997,7 +935,7 @@ public Write to(String topic) { /** Like {@code topic()} but with a {@link ValueProvider}. */ public Write to(ValueProvider topic) { return toBuilder() - .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())) + .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)) .build(); } @@ -1086,10 +1024,10 @@ public PDone expand(PCollection input) { return PDone.in(input.getPipeline()); case UNBOUNDED: return input - .apply(MapElements.via(getFormatFn())) + .apply(MapElements.into(new TypeDescriptor() {}).via(getFormatFn())) .apply( new PubsubUnboundedSink( - Optional.ofNullable(getPubsubClientFactory()).orElse(FACTORY), + getPubsubClientFactory(), NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()), getTimestampAttribute(), getIdAttribute(), @@ -1139,8 +1077,7 @@ public void startBundle(StartBundleContext c) throws IOException { // NOTE: idAttribute is ignored. this.pubsubClient = - Optional.ofNullable(getPubsubClientFactory()) - .orElse(FACTORY) + getPubsubClientFactory() .newClient( getTimestampAttribute(), null, c.getPipelineOptions().as(PubsubOptions.class)); } @@ -1207,58 +1144,24 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - private static class ParsePayloadAsUtf8 extends SimpleFunction { - @Override - public String apply(PubsubMessage input) { - return new String(input.getPayload(), StandardCharsets.UTF_8); - } - } - - private static class ParsePayloadUsingCoder extends SimpleFunction { - private Coder coder; - - public ParsePayloadUsingCoder(Coder coder) { - this.coder = coder; - } - - @Override - public T apply(PubsubMessage input) { + private static SerializableFunction parsePayloadUsingCoder(Coder coder) { + return message -> { try { - return CoderUtils.decodeFromByteArray(coder, input.getPayload()); + return CoderUtils.decodeFromByteArray(coder, message.getPayload()); } catch (CoderException e) { throw new RuntimeException("Could not decode Pubsub message", e); } - } - } - - private static class FormatPayloadAsUtf8 extends SimpleFunction { - @Override - public PubsubMessage apply(String input) { - return new PubsubMessage(input.getBytes(StandardCharsets.UTF_8), ImmutableMap.of()); - } + }; } - private static class FormatPayloadUsingCoder extends SimpleFunction { - private Coder coder; - - public FormatPayloadUsingCoder(Coder coder) { - this.coder = coder; - } - - @Override - public PubsubMessage apply(T input) { + private static SerializableFunction formatPayloadUsingCoder( + Coder coder) { + return input -> { try { return new PubsubMessage(CoderUtils.encodeToByteArray(coder, input), ImmutableMap.of()); } catch (CoderException e) { - throw new RuntimeException("Could not decode Pubsub message", e); + throw new RuntimeException("Could not encode Pubsub message", e); } - } - } - - private static class IdentityMessageFn extends SimpleFunction { - @Override - public PubsubMessage apply(PubsubMessage input) { - return input; - } + }; } } From 87c386087897fadcb4d4a78604643e1b832f23ee Mon Sep 17 00:00:00 2001 From: git Date: Sat, 28 Dec 2019 03:10:16 -0500 Subject: [PATCH 3/6] Move PubsubBoundedWriter outside of PubsubIO. This should have no user-visible effects. --- .../io/gcp/pubsub/PubsubBoundedWriter.java | 138 ++++++++++++++++++ .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 133 +---------------- 2 files changed, 142 insertions(+), 129 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java new file mode 100644 index 000000000000..01369d7fcab4 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.naming.SizeLimitExceededException; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; + +/** Writer to Pubsub which batches messages from bounded collections. */ +class PubsubBoundedWriter extends DoFn { + /** + * Max batch byte size. Messages are base64 encoded which encodes each set of three bytes into + * four bytes. + */ + private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = ((10 * 1000 * 1000) / 4) * 3; + + private static final int MAX_PUBLISH_BATCH_SIZE = 100; + + private transient List output; + private transient PubsubClient pubsubClient; + private transient int currentOutputBytes; + + private final int maxPublishBatchByteSize; + private final int maxPublishBatchSize; + private final PubsubIO.Write write; + + private PubsubBoundedWriter(PubsubIO.Write write) { + Preconditions.checkNotNull(write.getTopicProvider()); + this.maxPublishBatchSize = + MoreObjects.firstNonNull(write.getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE); + this.maxPublishBatchByteSize = + MoreObjects.firstNonNull(write.getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT); + this.write = write; + } + + static PubsubBoundedWriter of(PubsubIO.Write write) { + return new PubsubBoundedWriter<>(write); + } + + @StartBundle + public void startBundle(StartBundleContext c) throws IOException { + this.output = new ArrayList<>(); + this.currentOutputBytes = 0; + + // NOTE: idAttribute is ignored. + this.pubsubClient = + write + .getPubsubClientFactory() + .newClient( + write.getTimestampAttribute(), + null, + c.getPipelineOptions().as(PubsubOptions.class)); + } + + @ProcessElement + public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException { + byte[] payload; + PubsubMessage message = write.getFormatFn().apply(c.element()); + payload = message.getPayload(); + Map attributes = message.getAttributeMap(); + + if (payload.length > maxPublishBatchByteSize) { + String msg = + String.format( + "Pub/Sub message size (%d) exceeded maximum batch size (%d)", + payload.length, maxPublishBatchByteSize); + throw new SizeLimitExceededException(msg); + } + + // Checking before adding the message stops us from violating the max bytes + if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize) + || (output.size() >= maxPublishBatchSize)) { + publish(); + } + + // NOTE: The record id is always null. + output.add( + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(payload)) + .putAllAttributes(attributes) + .build(), + c.timestamp().getMillis(), + null)); + currentOutputBytes += payload.length; + } + + @FinishBundle + public void finishBundle() throws IOException { + if (!output.isEmpty()) { + publish(); + } + output = null; + currentOutputBytes = 0; + pubsubClient.close(); + pubsubClient = null; + } + + private void publish() throws IOException { + PubsubTopic topic = write.getTopicProvider().get(); + int n = pubsubClient.publish(PubsubClient.topicPathFromPath(topic.asPath()), output); + checkState(n == output.size()); + output.clear(); + currentOutputBytes = 0; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.delegate(write); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index ee36fd20a08f..a90d0fea594d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -17,22 +17,15 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; - import com.google.api.client.util.Clock; import com.google.auto.value.AutoValue; import com.google.protobuf.ByteString; import com.google.protobuf.Message; -import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; -import javax.naming.SizeLimitExceededException; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; import org.apache.beam.sdk.PipelineRunner; @@ -43,7 +36,6 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.options.ValueProvider; @@ -52,7 +44,6 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.utils.AvroUtils; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -587,11 +578,11 @@ public abstract static class Read extends PTransform> @Nullable abstract ValueProvider getTopicProvider(); - abstract PubsubClient.PubsubClientFactory getPubsubClientFactory(); - @Nullable abstract ValueProvider getSubscriptionProvider(); + abstract PubsubClient.PubsubClientFactory getPubsubClientFactory(); + /** The name of the message attribute to read timestamps from. */ @Nullable abstract String getTimestampAttribute(); @@ -858,14 +849,6 @@ private PubsubIO() {} /** Implementation of write methods. */ @AutoValue public abstract static class Write extends PTransform, PDone> { - /** - * Max batch byte size. Messages are base64 encoded which encodes each set of three bytes into - * four bytes. - */ - private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = ((10 * 1000 * 1000) / 4) * 3; - - private static final int MAX_PUBLISH_BATCH_SIZE = 100; - @Nullable abstract ValueProvider getTopicProvider(); @@ -998,15 +981,6 @@ public Write withIdAttribute(String idAttribute) { return toBuilder().setIdAttribute(idAttribute).build(); } - /** - * Used to write a PubSub message together with PubSub attributes. The user-supplied format - * function translates the input type T to a PubsubMessage object, which is used by the sink to - * separately set the PubSub message's payload and attributes. - */ - private Write withFormatFn(SimpleFunction formatFn) { - return toBuilder().setFormatFn(formatFn).build(); - } - @Override public PDone expand(PCollection input) { if (getTopicProvider() == null) { @@ -1015,12 +989,8 @@ public PDone expand(PCollection input) { switch (input.isBounded()) { case BOUNDED: - input.apply( - ParDo.of( - new PubsubBoundedWriter( - MoreObjects.firstNonNull(getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE), - MoreObjects.firstNonNull( - getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT)))); + // NOTE: idAttribute is ignored. + input.apply(ParDo.of(PubsubBoundedWriter.of(this))); return PDone.in(input.getPipeline()); case UNBOUNDED: return input @@ -1047,101 +1017,6 @@ public void populateDisplayData(DisplayData.Builder builder) { populateCommonDisplayData( builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider()); } - - /** - * Writer to Pubsub which batches messages from bounded collections. - * - *

Public so can be suppressed by runners. - */ - public class PubsubBoundedWriter extends DoFn { - private transient List output; - private transient PubsubClient pubsubClient; - private transient int currentOutputBytes; - - private int maxPublishBatchByteSize; - private int maxPublishBatchSize; - - PubsubBoundedWriter(int maxPublishBatchSize, int maxPublishBatchByteSize) { - this.maxPublishBatchSize = maxPublishBatchSize; - this.maxPublishBatchByteSize = maxPublishBatchByteSize; - } - - PubsubBoundedWriter() { - this(MAX_PUBLISH_BATCH_SIZE, MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT); - } - - @StartBundle - public void startBundle(StartBundleContext c) throws IOException { - this.output = new ArrayList<>(); - this.currentOutputBytes = 0; - - // NOTE: idAttribute is ignored. - this.pubsubClient = - getPubsubClientFactory() - .newClient( - getTimestampAttribute(), null, c.getPipelineOptions().as(PubsubOptions.class)); - } - - @ProcessElement - public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException { - byte[] payload; - PubsubMessage message = getFormatFn().apply(c.element()); - payload = message.getPayload(); - Map attributes = message.getAttributeMap(); - - if (payload.length > maxPublishBatchByteSize) { - String msg = - String.format( - "Pub/Sub message size (%d) exceeded maximum batch size (%d)", - payload.length, maxPublishBatchByteSize); - throw new SizeLimitExceededException(msg); - } - - // Checking before adding the message stops us from violating the max bytes - if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize) - || (output.size() >= maxPublishBatchSize)) { - publish(); - } - - // NOTE: The record id is always null. - output.add( - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(payload)) - .putAllAttributes(attributes) - .build(), - c.timestamp().getMillis(), - null)); - currentOutputBytes += payload.length; - } - - @FinishBundle - public void finishBundle() throws IOException { - if (!output.isEmpty()) { - publish(); - } - output = null; - currentOutputBytes = 0; - pubsubClient.close(); - pubsubClient = null; - } - - private void publish() throws IOException { - PubsubTopic topic = getTopicProvider().get(); - int n = - pubsubClient.publish( - PubsubClient.topicPathFromName(topic.project, topic.topic), output); - checkState(n == output.size()); - output.clear(); - currentOutputBytes = 0; - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.delegate(Write.this); - } - } } private static SerializableFunction parsePayloadUsingCoder(Coder coder) { From 485d3952e771072d526b94d1a5a9ecc23f3e9125 Mon Sep 17 00:00:00 2001 From: git Date: Sat, 28 Dec 2019 03:17:38 -0500 Subject: [PATCH 4/6] Change the default pubsub client to the GRPC client from the json one. --- .../main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index a90d0fea594d..b830a5e8a4b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -84,7 +84,7 @@ public class PubsubIO { private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class); /** Factory for creating pubsub client to manage transport. */ - private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY; + private static final PubsubClient.PubsubClientFactory FACTORY = PubsubGrpcClient.FACTORY; /** * Project IDs must contain 6-63 lowercase letters, digits, or dashes. IDs must start with a From a26ac21006caee1477514cfe5b3ec122e0cf3c51 Mon Sep 17 00:00:00 2001 From: git Date: Sat, 28 Dec 2019 03:58:40 -0500 Subject: [PATCH 5/6] Extract IncomingMessage and OutgoingMessage from PubsubClient. --- .../sdk/io/gcp/pubsub/IncomingMessage.java | 56 +++++++++++++ .../io/gcp/pubsub/IncomingMessageCoder.java | 61 ++++++++++++++ .../sdk/io/gcp/pubsub/OutgoingMessage.java | 60 ++++++++++++++ .../io/gcp/pubsub/OutgoingMessageCoder.java | 58 +++++++++++++ .../io/gcp/pubsub/PubsubBoundedWriter.java | 1 - .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 81 ------------------- .../io/gcp/pubsub/PubsubUnboundedSink.java | 1 - .../io/gcp/pubsub/PubsubUnboundedSource.java | 10 +-- .../beam/sdk/io/gcp/pubsub/TestPubsub.java | 10 +-- .../sdk/io/gcp/pubsub/TestPubsubSignal.java | 3 +- .../io/gcp/pubsub/PubsubGrpcClientTest.java | 2 - .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 1 - .../io/gcp/pubsub/PubsubJsonClientTest.java | 2 - .../io/gcp/pubsub/PubsubTestClientTest.java | 2 - .../gcp/pubsub/PubsubUnboundedSinkTest.java | 1 - .../gcp/pubsub/PubsubUnboundedSourceTest.java | 1 - 16 files changed, 245 insertions(+), 105 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessage.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessageCoder.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessage.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessageCoder.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessage.java new file mode 100644 index 000000000000..dc36c7aeccc0 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.value.AutoValue; +import com.google.pubsub.v1.PubsubMessage; +import org.apache.beam.sdk.coders.DefaultCoder; + +/** A message received from Pubsub. */ +@AutoValue +@DefaultCoder(IncomingMessageCoder.class) +abstract class IncomingMessage { + + /** Underlying Message. */ + public abstract com.google.pubsub.v1.PubsubMessage message(); + + /** + * Timestamp for element (ms since epoch). Either Pubsub's processing time, or the custom + * timestamp associated with the message. + */ + public abstract long timestampMsSinceEpoch(); + + /** Timestamp (in system time) at which we requested the message (ms since epoch). */ + public abstract long requestTimeMsSinceEpoch(); + + /** Id to pass back to Pubsub to acknowledge receipt of this message. */ + public abstract String ackId(); + + /** Id to pass to the runner to distinguish this message from all others. */ + public abstract String recordId(); + + public static IncomingMessage of( + PubsubMessage message, + long timestampMsSinceEpoch, + long requestTimeMsSinceEpoch, + String ackId, + String recordId) { + return new AutoValue_IncomingMessage( + message, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessageCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessageCoder.java new file mode 100644 index 000000000000..a4c64c091c9a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/IncomingMessageCoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.pubsub.v1.PubsubMessage; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A coder for IncomingMessage. */ +public class IncomingMessageCoder extends CustomCoder { + public static CoderProvider getCoderProvider() { + return CoderProviders.forCoder( + TypeDescriptor.of(IncomingMessage.class), new IncomingMessageCoder()); + } + + public static IncomingMessageCoder of() { + return new IncomingMessageCoder(); + } + + @Override + public void encode(IncomingMessage value, OutputStream outStream) throws IOException { + ProtoCoder.of(PubsubMessage.class).encode(value.message(), outStream); + VarLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream); + VarLongCoder.of().encode(value.requestTimeMsSinceEpoch(), outStream); + StringUtf8Coder.of().encode(value.ackId(), outStream); + StringUtf8Coder.of().encode(value.recordId(), outStream); + } + + @Override + public IncomingMessage decode(InputStream inStream) throws IOException { + return IncomingMessage.of( + ProtoCoder.of(PubsubMessage.class).decode(inStream), + VarLongCoder.of().decode(inStream), + VarLongCoder.of().decode(inStream), + StringUtf8Coder.of().decode(inStream), + StringUtf8Coder.of().decode(inStream)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessage.java new file mode 100644 index 000000000000..7d855f85ec42 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessage.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.ByteString; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.DefaultCoder; + +/** A message to be sent to Pubsub. */ +@AutoValue +@DefaultCoder(OutgoingMessageCoder.class) +abstract class OutgoingMessage { + + /** Underlying Message. May not have publish timestamp set. */ + public abstract com.google.pubsub.v1.PubsubMessage message(); + + /** Timestamp for element (ms since epoch). */ + public abstract long timestampMsSinceEpoch(); + + /** + * If using an id attribute, the record id to associate with this record's metadata so the + * receiver can reject duplicates. Otherwise {@literal null}. + */ + @Nullable + public abstract String recordId(); + + public static OutgoingMessage of( + com.google.pubsub.v1.PubsubMessage message, + long timestampMsSinceEpoch, + @Nullable String recordId) { + return new AutoValue_OutgoingMessage(message, timestampMsSinceEpoch, recordId); + } + + public static OutgoingMessage of( + PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) { + com.google.pubsub.v1.PubsubMessage.Builder builder = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(message.getPayload())); + if (message.getAttributeMap() != null) { + builder.putAllAttributes(message.getAttributeMap()); + } + return of(builder.build(), timestampMsSinceEpoch, recordId); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessageCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessageCoder.java new file mode 100644 index 000000000000..2436324529bd --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/OutgoingMessageCoder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.pubsub.v1.PubsubMessage; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A coder for OutgoingMessage. */ +public class OutgoingMessageCoder extends CustomCoder { + public static CoderProvider getCoderProvider() { + return CoderProviders.forCoder( + TypeDescriptor.of(OutgoingMessage.class), new OutgoingMessageCoder()); + } + + public static OutgoingMessageCoder of() { + return new OutgoingMessageCoder(); + } + + @Override + public void encode(OutgoingMessage value, OutputStream outStream) throws IOException { + ProtoCoder.of(PubsubMessage.class).encode(value.message(), outStream); + VarLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream); + NullableCoder.of(StringUtf8Coder.of()).encode(value.recordId(), outStream); + } + + @Override + public OutgoingMessage decode(InputStream inStream) throws IOException { + return OutgoingMessage.of( + ProtoCoder.of(PubsubMessage.class).decode(inStream), + VarLongCoder.of().decode(inStream), + NullableCoder.of(StringUtf8Coder.of()).decode(inStream)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java index 01369d7fcab4..f0ae47f585c2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubBoundedWriter.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import javax.naming.SizeLimitExceededException; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index 6f0f54d8b21e..ab1fc744045e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -21,9 +21,6 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.DateTime; -import com.google.auto.value.AutoValue; -import com.google.protobuf.ByteString; -import com.google.pubsub.v1.PubsubMessage; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; @@ -294,84 +291,6 @@ public static TopicPath topicPathFromName(String projectId, String topicName) { return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName)); } - /** - * A message to be sent to Pubsub. - * - *

NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java - * serialization is never used for non-test clients. - */ - @AutoValue - public abstract static class OutgoingMessage implements Serializable { - - /** Underlying Message. May not have publish timestamp set. */ - public abstract PubsubMessage message(); - - /** Timestamp for element (ms since epoch). */ - public abstract long timestampMsSinceEpoch(); - - /** - * If using an id attribute, the record id to associate with this record's metadata so the - * receiver can reject duplicates. Otherwise {@literal null}. - */ - @Nullable - public abstract String recordId(); - - public static OutgoingMessage of( - PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) { - return new AutoValue_PubsubClient_OutgoingMessage(message, timestampMsSinceEpoch, recordId); - } - - public static OutgoingMessage of( - org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message, - long timestampMsSinceEpoch, - @Nullable String recordId) { - PubsubMessage.Builder builder = - PubsubMessage.newBuilder().setData(ByteString.copyFrom(message.getPayload())); - if (message.getAttributeMap() != null) { - builder.putAllAttributes(message.getAttributeMap()); - } - return of(builder.build(), timestampMsSinceEpoch, recordId); - } - } - - /** - * A message received from Pubsub. - * - *

NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java - * serialization is never used for non-test clients. - */ - @AutoValue - abstract static class IncomingMessage implements Serializable { - - /** Underlying Message. */ - public abstract PubsubMessage message(); - - /** - * Timestamp for element (ms since epoch). Either Pubsub's processing time, or the custom - * timestamp associated with the message. - */ - public abstract long timestampMsSinceEpoch(); - - /** Timestamp (in system time) at which we requested the message (ms since epoch). */ - public abstract long requestTimeMsSinceEpoch(); - - /** Id to pass back to Pubsub to acknowledge receipt of this message. */ - public abstract String ackId(); - - /** Id to pass to the runner to distinguish this message from all others. */ - public abstract String recordId(); - - public static IncomingMessage of( - PubsubMessage message, - long timestampMsSinceEpoch, - long requestTimeMsSinceEpoch, - String ackId, - String recordId) { - return new AutoValue_PubsubClient_IncomingMessage( - message, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId); - } - } - /** * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages published. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 8be8c56c731f..57da4c3bee40 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Counter; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 230161ca1433..e338eab94b54 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -384,7 +384,7 @@ static class PubsubReader extends UnboundedSource.UnboundedReader * Messages we have received from Pubsub and not yet delivered downstream. We preserve their * order. */ - private final Queue notYetRead; + private final Queue notYetRead; private static class InFlightState { /** When request which yielded message was issues. */ @@ -438,7 +438,7 @@ public InFlightState(long requestTimeMsSinceEpoch, long ackDeadlineMsSinceEpoch) private long lastWatermarkMsSinceEpoch; /** The current message, or {@literal null} if none. */ - @Nullable private PubsubClient.IncomingMessage current; + @Nullable private IncomingMessage current; /** Stats only: System time (ms since epoch) we last logs stats, or -1 if never. */ private long lastLogTimestampMsSinceEpoch; @@ -715,7 +715,7 @@ private void pull() throws IOException { // Pull the next batch. // BLOCKs until received. - Collection receivedMessages = + Collection receivedMessages = pubsubClient.get().pull(requestTimeMsSinceEpoch, subscription, PULL_BATCH_SIZE, true); if (receivedMessages.isEmpty()) { // Nothing available yet. Try again later. @@ -725,7 +725,7 @@ private void pull() throws IOException { lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch; // Capture the received messages. - for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) { + for (IncomingMessage incomingMessage : receivedMessages) { notYetRead.add(incomingMessage); notYetReadBytes += incomingMessage.message().getData().size(); inFlight.put( @@ -986,7 +986,7 @@ public PubsubCheckpoint getCheckpointMark() { // in which case we'll double ACK messages to Pubsub. However Pubsub is fine with that. List snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds); List snapshotNotYetReadIds = new ArrayList<>(notYetRead.size()); - for (PubsubClient.IncomingMessage incomingMessage : notYetRead) { + for (IncomingMessage incomingMessage : notYetRead) { snapshotNotYetReadIds.add(incomingMessage.ackId()); } if (outer.subscriptionPath == null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java index e1e8711587a6..9550b77fdd7d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java @@ -29,7 +29,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; @@ -190,7 +189,7 @@ private List listSubscriptions(ProjectPath projectPath, TopicP /** Publish messages to {@link #topicPath()}. */ public void publish(List messages) throws IOException { - List outgoingMessages = + List outgoingMessages = messages.stream().map(this::toOutgoingMessage).collect(toList()); pubsub.publish(eventsTopicPath, outgoingMessages); } @@ -202,8 +201,7 @@ public List pull() throws IOException { /** Pull up to {@code maxBatchSize} messages from {@link #subscriptionPath()}. */ public List pull(int maxBatchSize) throws IOException { - List messages = - pubsub.pull(0, subscriptionPath, maxBatchSize, true); + List messages = pubsub.pull(0, subscriptionPath, maxBatchSize, true); if (!messages.isEmpty()) { pubsub.acknowledge( subscriptionPath, @@ -298,8 +296,8 @@ public void checkIfAnySubscriptionExists(String project, Duration timeoutDuratio } } - private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage message) { - return PubsubClient.OutgoingMessage.of( + private OutgoingMessage toOutgoingMessage(PubsubMessage message) { + return OutgoingMessage.of( com.google.pubsub.v1.PubsubMessage.newBuilder() .setData(ByteString.copyFrom(message.getPayload())) .putAllAttributes(message.getAttributeMap()) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java index de4a71557dd1..95ac7538a431 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java @@ -29,7 +29,6 @@ import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.state.BagState; @@ -244,7 +243,7 @@ public void waitForSuccess(Duration duration) throws IOException { private String pollForResultForDuration( SubscriptionPath signalSubscriptionPath, Duration duration) throws IOException { - List signal = null; + List signal = null; DateTime endPolling = DateTime.now().plus(duration.getMillis()); do { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java index 4dd719b5de1d..988dc00bde66 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java @@ -41,8 +41,6 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index 0dc910f2c701..c1d2b695b6a4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -48,7 +48,6 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.AvroGeneratedUser; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read; import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index aad9729699a8..dd78566c819c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -41,8 +41,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java index 6b920e839c59..392316102dde 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java @@ -25,8 +25,6 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index f8cd86ee463c..818b97b9e762 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.RecordIdMethod; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java index 43ecbdc5821e..ea18a67a5182 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; From 7f992ca2946fb036d988bbee7c83acf2a7b717f2 Mon Sep 17 00:00:00 2001 From: git Date: Sat, 28 Dec 2019 05:28:57 -0500 Subject: [PATCH 6/6] Modify pubsub sinks to use the proto PubsubMessage internally, and add a public writeWireMessages() method to PubsubIO. --- .../beam/sdk/io/gcp/pubsub/ExternalWrite.java | 5 +- .../sdk/io/gcp/pubsub/OutgoingMessage.java | 9 +-- .../io/gcp/pubsub/PubsubBoundedWriter.java | 38 ++++----- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 78 ++++++++++++------- .../beam/sdk/io/gcp/pubsub/PubsubMessage.java | 13 ++++ .../io/gcp/pubsub/PubsubUnboundedSink.java | 46 +---------- .../io/gcp/pubsub/PubsubIOExternalTest.java | 9 ++- .../gcp/pubsub/PubsubUnboundedSinkTest.java | 26 ++----- 8 files changed, 94 insertions(+), 130 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java index 813a01c64480..fa8214ed0f11 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java @@ -19,6 +19,7 @@ import com.google.auto.service.AutoService; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.pubsub.v1.PubsubMessage; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; @@ -90,9 +91,7 @@ private static class FormatFn implements SerializableFunction extends DoFn { +class PubsubBoundedWriter extends DoFn { /** * Max batch byte size. Messages are base64 encoded which encodes each set of three bytes into * four bytes. @@ -47,9 +46,9 @@ class PubsubBoundedWriter extends DoFn { private final int maxPublishBatchByteSize; private final int maxPublishBatchSize; - private final PubsubIO.Write write; + private final PubsubIO.Write write; - private PubsubBoundedWriter(PubsubIO.Write write) { + private PubsubBoundedWriter(PubsubIO.Write write) { Preconditions.checkNotNull(write.getTopicProvider()); this.maxPublishBatchSize = MoreObjects.firstNonNull(write.getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE); @@ -58,8 +57,8 @@ private PubsubBoundedWriter(PubsubIO.Write write) { this.write = write; } - static PubsubBoundedWriter of(PubsubIO.Write write) { - return new PubsubBoundedWriter<>(write); + static PubsubBoundedWriter of(PubsubIO.Write write) { + return new PubsubBoundedWriter(write); } @StartBundle @@ -78,36 +77,25 @@ public void startBundle(StartBundleContext c) throws IOException { } @ProcessElement - public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException { - byte[] payload; - PubsubMessage message = write.getFormatFn().apply(c.element()); - payload = message.getPayload(); - Map attributes = message.getAttributeMap(); - - if (payload.length > maxPublishBatchByteSize) { + public void processElement(@Element PubsubMessage message, ProcessContext c) + throws IOException, SizeLimitExceededException { + if (message.getData().size() > maxPublishBatchByteSize) { String msg = String.format( "Pub/Sub message size (%d) exceeded maximum batch size (%d)", - payload.length, maxPublishBatchByteSize); + message.getData().size(), maxPublishBatchByteSize); throw new SizeLimitExceededException(msg); } // Checking before adding the message stops us from violating the max bytes - if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize) + if (((currentOutputBytes + message.getData().size()) >= maxPublishBatchByteSize) || (output.size() >= maxPublishBatchSize)) { publish(); } // NOTE: The record id is always null. - output.add( - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(payload)) - .putAllAttributes(attributes) - .build(), - c.timestamp().getMillis(), - null)); - currentOutputBytes += payload.length; + output.add(OutgoingMessage.of(message, c.timestamp().getMillis(), null)); + currentOutputBytes += message.getData().size(); } @FinishBundle diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index b830a5e8a4b6..01d104cdc8ea 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -59,7 +59,6 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -540,7 +539,16 @@ public static Read readAvrosWithBeamSchema(Class clazz) { /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ public static Write writeMessages() { - return Write.newBuilder().build(); + return Write.newBuilder(PubsubMessage::toProto).build(); + } + + /** + * Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. + * + *

The publish_time and message_id fields will be dropped if present. + */ + public static Write writeWireMessages() { + return Write.newBuilder(PubsubMessage::toProto).build(); } /** @@ -550,7 +558,9 @@ public static Write writeMessages() { public static Write writeStrings() { return Write.newBuilder( (String string) -> - new PubsubMessage(string.getBytes(StandardCharsets.UTF_8), ImmutableMap.of())) + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(string)) + .build()) .build(); } @@ -871,21 +881,18 @@ public abstract static class Write extends PTransform, PDone> abstract String getIdAttribute(); /** The format function for input PubsubMessage objects. */ - abstract SerializableFunction getFormatFn(); + abstract SerializableFunction getFormatFn(); abstract Builder toBuilder(); - static Builder newBuilder(SerializableFunction formatFn) { + static Builder newBuilder( + SerializableFunction formatFn) { Builder builder = new AutoValue_PubsubIO_Write.Builder(); builder.setPubsubClientFactory(FACTORY); builder.setFormatFn(formatFn); return builder; } - static Builder newBuilder() { - return newBuilder(x -> x); - } - @AutoValue.Builder abstract static class Builder { abstract Builder setTopicProvider(ValueProvider topicProvider); @@ -900,7 +907,8 @@ abstract static class Builder { abstract Builder setIdAttribute(String idAttribute); - abstract Builder setFormatFn(SerializableFunction formatFn); + abstract Builder setFormatFn( + SerializableFunction formatFn); abstract Write build(); } @@ -987,26 +995,37 @@ public PDone expand(PCollection input) { throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform"); } + PCollection preprocessed = + input + .apply( + "Convert to proto", + MapElements.into(new TypeDescriptor() {}) + .via(getFormatFn())) + .setCoder(ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class)) + .apply( + "Strip id and publish time", + MapElements.into(new TypeDescriptor() {}) + .via( + message -> + message.toBuilder().clearMessageId().clearPublishTime().build())); + switch (input.isBounded()) { case BOUNDED: // NOTE: idAttribute is ignored. - input.apply(ParDo.of(PubsubBoundedWriter.of(this))); + preprocessed.apply(ParDo.of(PubsubBoundedWriter.of(this))); return PDone.in(input.getPipeline()); case UNBOUNDED: - return input - .apply(MapElements.into(new TypeDescriptor() {}).via(getFormatFn())) - .apply( - new PubsubUnboundedSink( - getPubsubClientFactory(), - NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()), - getTimestampAttribute(), - getIdAttribute(), - 100 /* numShards */, - MoreObjects.firstNonNull( - getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), - MoreObjects.firstNonNull( - getMaxBatchBytesSize(), - PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES))); + return preprocessed.apply( + new PubsubUnboundedSink( + getPubsubClientFactory(), + NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()), + getTimestampAttribute(), + getIdAttribute(), + 100 /* numShards */, + MoreObjects.firstNonNull( + getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), + MoreObjects.firstNonNull( + getMaxBatchBytesSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES))); } throw new RuntimeException(); // cases are exhaustive. } @@ -1029,11 +1048,14 @@ private static SerializableFunction parsePayloadUsingCoder }; } - private static SerializableFunction formatPayloadUsingCoder( - Coder coder) { + private static + SerializableFunction formatPayloadUsingCoder( + Coder coder) { return input -> { try { - return new PubsubMessage(CoderUtils.encodeToByteArray(coder, input), ImmutableMap.of()); + return com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(CoderUtils.encodeToByteArray(coder, input))) + .build(); } catch (CoderException e) { throw new RuntimeException("Could not encode Pubsub message", e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java index cbe664b333c0..a17a7449d158 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import com.google.protobuf.ByteString; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; @@ -45,6 +46,18 @@ public PubsubMessage(byte[] payload, Map attributes, String mess this.messageId = messageId; } + com.google.pubsub.v1.PubsubMessage toProto() { + com.google.pubsub.v1.PubsubMessage.Builder builder = + com.google.pubsub.v1.PubsubMessage.newBuilder().setData(ByteString.copyFrom(getPayload())); + if (getAttributeMap() != null) { + builder.putAllAttributes(getAttributeMap()); + } + if (getMessageId() != null) { + builder.setMessageId(messageId); + } + return builder.build(); + } + /** Returns the main PubSub message. */ public byte[] getPayload() { return message; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 57da4c3bee40..cb6412e49d5d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -19,25 +19,13 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; +import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.MapCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Counter; @@ -90,33 +78,6 @@ public class PubsubUnboundedSink extends PTransform, /** Default longest delay between receiving a message and pushing it to Pubsub. */ private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2); - /** Coder for conveying outgoing messages between internal stages. */ - private static class OutgoingMessageCoder extends AtomicCoder { - private static final NullableCoder RECORD_ID_CODER = - NullableCoder.of(StringUtf8Coder.of()); - private static final NullableCoder> ATTRIBUTES_CODER = - NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - @Override - public void encode(OutgoingMessage value, OutputStream outStream) - throws CoderException, IOException { - ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).encode(value.message(), outStream); - BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream); - RECORD_ID_CODER.encode(value.recordId(), outStream); - } - - @Override - public OutgoingMessage decode(InputStream inStream) throws CoderException, IOException { - com.google.pubsub.v1.PubsubMessage message = - ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).decode(inStream); - long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream); - @Nullable String recordId = RECORD_ID_CODER.decode(inStream); - return OutgoingMessage.of(message, timestampMsSinceEpoch, recordId); - } - } - - @VisibleForTesting static final Coder CODER = new OutgoingMessageCoder(); - // ================================================================================ // RecordIdMethod // ================================================================================ @@ -151,7 +112,6 @@ private static class ShardFn extends DoFn input) { AfterProcessingTime.pastFirstElementInPane().plusDelayOf(maxLatency)))) .discardingFiredPanes()) .apply("PubsubUnboundedSink.Shard", ParDo.of(new ShardFn(numShards, recordIdMethod))) - .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) .apply(GroupByKey.create()) .apply( "PubsubUnboundedSink.Writer", diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java index abecf899aa07..f742d5faa3a4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java @@ -173,15 +173,16 @@ public void testConstructPubsubWrite() throws Exception { RunnerApi.PTransform transform = result.getTransform(); assertThat( - transform.getSubtransformsList(), - Matchers.contains( - "test_namespacetest/MapElements", "test_namespacetest/PubsubUnboundedSink")); + Iterables.getLast(transform.getSubtransformsList()), + Matchers.is("test_namespacetest/PubsubUnboundedSink")); assertThat(transform.getInputsCount(), Matchers.is(1)); assertThat(transform.getOutputsCount(), Matchers.is(0)); // test_namespacetest/PubsubUnboundedSink RunnerApi.PTransform writeComposite = - result.getComponents().getTransformsOrThrow(transform.getSubtransforms(1)); + result + .getComponents() + .getTransformsOrThrow(Iterables.getLast(transform.getSubtransformsList())); // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer RunnerApi.PTransform writeComposite2 = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index 818b97b9e762..6f4bbeecb845 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.pubsub; import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -28,7 +29,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.RecordIdMethod; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -67,9 +67,12 @@ private Stamp(Map attributes) { } @ProcessElement - public void processElement(ProcessContext c) { + public void processElement(@Element String element, ProcessContext c) { c.outputWithTimestamp( - new PubsubMessage(c.element().getBytes(StandardCharsets.UTF_8), attributes), + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(element)) + .putAllAttributes(attributes) + .build(), new Instant(TIMESTAMP)); } } @@ -80,19 +83,6 @@ private String getRecordId(String data) { @Rule public transient TestPipeline p = TestPipeline.create(); - @Test - public void saneCoder() throws Exception { - OutgoingMessage message = - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(DATA)) - .build(), - TIMESTAMP, - getRecordId(DATA)); - CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message); - CoderProperties.coderSerializable(PubsubUnboundedSink.CODER); - } - @Test public void sendOneMessage() throws IOException { List outgoing = @@ -149,9 +139,7 @@ public void sendOneMessageWithoutAttributes() throws IOException { 1 /* batchBytes */, Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC); - p.apply(Create.of(ImmutableList.of(DATA))) - .apply(ParDo.of(new Stamp(null /* attributes */))) - .apply(sink); + p.apply(Create.of(ImmutableList.of(DATA))).apply(ParDo.of(new Stamp())).apply(sink); p.run(); } // The PubsubTestClientFactory will assert fail on close if the actual published