diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java new file mode 100644 index 000000000000..c8f8611f5892 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.service.AutoService; +import com.google.protobuf.ByteString; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubSubscription; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** Exposes {@link PubsubIO.Read} as an external transform for cross-language usage. */ +@Experimental +@AutoService(ExternalTransformRegistrar.class) +public final class ExternalRead implements ExternalTransformRegistrar { + public ExternalRead() {} + + public static final String URN = "beam:external:java:pubsub:read:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(URN, ReadBuilder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class Configuration { + @Nullable private String topic; + @Nullable private String subscription; + @Nullable private String idAttribute; + @Nullable private String timestampAttribute; + private boolean needsAttributes; + + public void setTopic(@Nullable String topic) { + this.topic = topic; + } + + public void setSubscription(@Nullable String subscription) { + this.subscription = subscription; + } + + public void setIdLabel(@Nullable String idAttribute) { + this.idAttribute = idAttribute; + } + + public void setTimestampAttribute(@Nullable String timestampAttribute) { + this.timestampAttribute = timestampAttribute; + } + + public void setWithAttributes(Boolean needsAttributes) { + // we must use Boolean instead of boolean because the external payload system + // inspects the native type of each coder urn, and BooleanCoder wants Boolean. + this.needsAttributes = needsAttributes; + } + } + + public static class ReadBuilder + implements ExternalTransformBuilder> { + public ReadBuilder() {} + + @Override + public PTransform> buildExternal(Configuration config) { + PubsubIO.Read.Builder readBuilder = new AutoValue_PubsubIO_Read.Builder<>(); + readBuilder.setCoder(ByteArrayCoder.of()); + if (config.topic != null) { + StaticValueProvider topic = StaticValueProvider.of(config.topic); + readBuilder.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)); + } + if (config.subscription != null) { + StaticValueProvider subscription = StaticValueProvider.of(config.subscription); + readBuilder.setSubscriptionProvider( + NestedValueProvider.of(subscription, PubsubSubscription::fromPath)); + } + if (config.idAttribute != null) { + readBuilder.setIdAttribute(config.idAttribute); + } + if (config.timestampAttribute != null) { + readBuilder.setTimestampAttribute(config.timestampAttribute); + } + readBuilder.setNeedsAttributes(config.needsAttributes); + if (config.needsAttributes) { + readBuilder.setParseFn(new ParsePayloadAsPubsubMessageProto()); + } else { + readBuilder.setParseFn( + new SimpleFunction() { + @Override + public byte[] apply(PubsubMessage input) { + return input.getPayload(); + } + }); + } + readBuilder.setNeedsMessageId(false); + return readBuilder.build(); + } + } + + // Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation. + private static class ParsePayloadAsPubsubMessageProto + extends SimpleFunction { + @Override + public byte[] apply(PubsubMessage input) { + Map attributes = input.getAttributeMap(); + com.google.pubsub.v1.PubsubMessage.Builder message = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(input.getPayload())); + // TODO(BEAM-8085) this should not be null + if (attributes != null) { + message.putAllAttributes(attributes); + } + return message.build().toByteArray(); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java new file mode 100644 index 000000000000..93ab52d8caac --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.service.AutoService; +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** Exposes {@link PubsubIO.Write} as an external transform for cross-language usage. */ +@Experimental +@AutoService(ExternalTransformRegistrar.class) +public final class ExternalWrite implements ExternalTransformRegistrar { + public ExternalWrite() {} + + public static final String URN = "beam:external:java:pubsub:write:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(URN, WriteBuilder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class Configuration { + private String topic; + @Nullable private String idAttribute; + @Nullable private String timestampAttribute; + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setIdLabel(@Nullable String idAttribute) { + this.idAttribute = idAttribute; + } + + public void setTimestampAttribute(@Nullable String timestampAttribute) { + this.timestampAttribute = timestampAttribute; + } + } + + public static class WriteBuilder + implements ExternalTransformBuilder, PDone> { + public WriteBuilder() {} + + @Override + public PTransform, PDone> buildExternal(Configuration config) { + PubsubIO.Write.Builder writeBuilder = new AutoValue_PubsubIO_Write.Builder<>(); + writeBuilder.setFormatFn(new FormatFn()); + if (config.topic != null) { + StaticValueProvider topic = StaticValueProvider.of(config.topic); + writeBuilder.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)); + } + if (config.idAttribute != null) { + writeBuilder.setIdAttribute(config.idAttribute); + } + if (config.timestampAttribute != null) { + writeBuilder.setTimestampAttribute(config.timestampAttribute); + } + return writeBuilder.build(); + } + } + + private static class FormatFn extends SimpleFunction { + @Override + public PubsubMessage apply(byte[] input) { + try { + com.google.pubsub.v1.PubsubMessage message = + com.google.pubsub.v1.PubsubMessage.parseFrom(input); + return new PubsubMessage(message.getData().toByteArray(), message.getAttributesMap()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Could not decode Pubsub message", e); + } + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index e9d893ffdc79..cd6bba0f814c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -20,10 +20,8 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.Clock; -import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -42,11 +40,9 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; @@ -58,7 +54,6 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -718,8 +713,7 @@ public abstract static class Read extends PTransform> abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder - implements ExternalTransformBuilder> { + abstract static class Builder { abstract Builder setTopicProvider(ValueProvider topic); abstract Builder setPubsubClientFactory(PubsubClient.PubsubClientFactory clientFactory); @@ -750,85 +744,6 @@ abstract static class Builder abstract Builder setClock(@Nullable Clock clock); abstract Read build(); - - @Override - public PTransform> buildExternal(External.Configuration config) { - if (config.topic != null) { - StaticValueProvider topic = StaticValueProvider.of(config.topic); - setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())); - } - if (config.subscription != null) { - StaticValueProvider subscription = StaticValueProvider.of(config.subscription); - setSubscriptionProvider( - NestedValueProvider.of(subscription, new SubscriptionTranslator())); - } - if (config.idAttribute != null) { - setIdAttribute(config.idAttribute); - } - if (config.timestampAttribute != null) { - setTimestampAttribute(config.timestampAttribute); - } - setPubsubClientFactory(FACTORY); - setNeedsAttributes(config.needsAttributes); - Coder coder = ByteArrayCoder.of(); - if (config.needsAttributes) { - SimpleFunction parseFn = - (SimpleFunction) new ParsePayloadAsPubsubMessageProto(); - setParseFn(parseFn); - setCoder(coder); - } else { - setParseFn(new ParsePayloadUsingCoder<>(coder)); - setCoder(coder); - } - setNeedsMessageId(false); - return build(); - } - } - - /** Exposes {@link PubSubIO.Read} as an external transform for cross-language usage. */ - @Experimental(Kind.PORTABILITY) - @AutoService(ExternalTransformRegistrar.class) - public static class External implements ExternalTransformRegistrar { - - public static final String URN = "beam:external:java:pubsub:read:v1"; - - @Override - public Map> knownBuilders() { - return ImmutableMap.of(URN, AutoValue_PubsubIO_Read.Builder.class); - } - - /** Parameters class to expose the transform to an external SDK. */ - public static class Configuration { - - // All byte arrays are UTF-8 encoded strings - @Nullable private String topic; - @Nullable private String subscription; - @Nullable private String idAttribute; - @Nullable private String timestampAttribute; - private boolean needsAttributes; - - public void setTopic(@Nullable String topic) { - this.topic = topic; - } - - public void setSubscription(@Nullable String subscription) { - this.subscription = subscription; - } - - public void setIdLabel(@Nullable String idAttribute) { - this.idAttribute = idAttribute; - } - - public void setTimestampAttribute(@Nullable String timestampAttribute) { - this.timestampAttribute = timestampAttribute; - } - - public void setWithAttributes(Boolean needsAttributes) { - // we must use Boolean instead of boolean because the external payload system - // inspects the native type of each coder urn, and BooleanCoder wants Boolean. - this.needsAttributes = needsAttributes; - } - } } /** @@ -1051,8 +966,7 @@ public abstract static class Write extends PTransform, PDone> abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder - implements ExternalTransformBuilder, PDone> { + abstract static class Builder { abstract Builder setTopicProvider(ValueProvider topicProvider); abstract Builder setPubsubClientFactory(PubsubClient.PubsubClientFactory factory); @@ -1068,58 +982,6 @@ abstract static class Builder abstract Builder setFormatFn(SimpleFunction formatFn); abstract Write build(); - - @Override - public PTransform, PDone> buildExternal(External.Configuration config) { - if (config.topic != null) { - StaticValueProvider topic = StaticValueProvider.of(config.topic); - setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())); - } - if (config.idAttribute != null) { - setIdAttribute(config.idAttribute); - } - if (config.timestampAttribute != null) { - setTimestampAttribute(config.timestampAttribute); - } - SimpleFunction parseFn = - (SimpleFunction) new FormatPayloadFromPubsubMessageProto(); - setFormatFn(parseFn); - return build(); - } - } - - /** Exposes {@link PubSubIO.Write} as an external transform for cross-language usage. */ - @Experimental(Kind.PORTABILITY) - @AutoService(ExternalTransformRegistrar.class) - public static class External implements ExternalTransformRegistrar { - - public static final String URN = "beam:external:java:pubsub:write:v1"; - - @Override - public Map> knownBuilders() { - return ImmutableMap.of(URN, AutoValue_PubsubIO_Write.Builder.class); - } - - /** Parameters class to expose the transform to an external SDK. */ - public static class Configuration { - - // All byte arrays are UTF-8 encoded strings - private String topic; - @Nullable private String idAttribute; - @Nullable private String timestampAttribute; - - public void setTopic(String topic) { - this.topic = topic; - } - - public void setIdLabel(@Nullable String idAttribute) { - this.idAttribute = idAttribute; - } - - public void setTimestampAttribute(@Nullable String timestampAttribute) { - this.timestampAttribute = timestampAttribute; - } - } } /** @@ -1369,22 +1231,6 @@ public T apply(PubsubMessage input) { } } - private static class ParsePayloadAsPubsubMessageProto - extends SimpleFunction { - @Override - public byte[] apply(PubsubMessage input) { - Map attributes = input.getAttributeMap(); - com.google.pubsub.v1.PubsubMessage.Builder message = - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(input.getPayload())); - // TODO(BEAM-8085) this should not be null - if (attributes != null) { - message.putAllAttributes(attributes); - } - return message.build().toByteArray(); - } - } - private static class FormatPayloadAsUtf8 extends SimpleFunction { @Override public PubsubMessage apply(String input) { @@ -1409,20 +1255,6 @@ public PubsubMessage apply(T input) { } } - private static class FormatPayloadFromPubsubMessageProto - extends SimpleFunction { - @Override - public PubsubMessage apply(byte[] input) { - try { - com.google.pubsub.v1.PubsubMessage message = - com.google.pubsub.v1.PubsubMessage.parseFrom(input); - return new PubsubMessage(message.getData().toByteArray(), message.getAttributesMap()); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException("Could not decode Pubsub message", e); - } - } - } - private static class IdentityMessageFn extends SimpleFunction { @Override public PubsubMessage apply(PubsubMessage input) {