diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 302ae4f2fef0..07fda807bd45 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -130,7 +130,11 @@ public static Set query(MetricResults results, Type type) { .build(); Set result = new HashSet<>(); for (MetricResult metrics : results.queryMetrics(filter).getStringSets()) { - result.addAll(metrics.getCommitted().getStringSet()); + try { + result.addAll(metrics.getCommitted().getStringSet()); + } catch (UnsupportedOperationException unused) { + // MetricsResult.getCommitted throws this exception when runner support missing, just skip. + } result.addAll(metrics.getAttempted().getStringSet()); } return result; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java index 47033451ab89..521e65b934b9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java @@ -21,6 +21,7 @@ import java.util.Map; import javax.naming.SizeLimitExceededException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; @@ -43,6 +44,8 @@ public class PreparePubsubWriteDoFn extends DoFn private SerializableFunction, PubsubMessage> formatFunction; @Nullable SerializableFunction, PubsubIO.PubsubTopic> topicFunction; + /** Last TopicPath that reported Lineage. */ + private transient @Nullable String reportedLineage; private final BadRecordRouter badRecordRouter; @@ -165,6 +168,13 @@ public void process( return; } } + String topic = message.getTopic(); + // topic shouldn't be null, but lineage report is fail-safe + if (topic != null && !topic.equals(reportedLineage)) { + Lineage.getSinks() + .add("pubsub", "topic", PubsubClient.topicPathFromPath(topic).getDataCatalogSegments()); + reportedLineage = topic; + } try { validatePubsubMessageSize(message, maxPublishBatchSize); } catch (SizeLimitExceededException e) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 8b582c1054f8..b561b4711d52 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -1148,6 +1149,20 @@ public PCollection expand(PBegin input) { getNeedsOrderingKey()); PCollection preParse = input.apply(source); + return expandReadContinued(preParse, topicPath, subscriptionPath); + } + + /** + * Runner agnostic part of the Expansion. + * + *

Common logics (MapElements, SDK metrics, DLQ, etc) live here as PubsubUnboundedSource is + * overridden on Dataflow runner. + */ + private PCollection expandReadContinued( + PCollection preParse, + @Nullable ValueProvider topicPath, + @Nullable ValueProvider subscriptionPath) { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; PCollection read; if (getDeadLetterTopicProvider() == null @@ -1174,7 +1189,7 @@ public PCollection expand(PBegin input) { "Map Failures To BadRecords", ParDo.of(new ParseReadFailuresToBadRecords(preParse.getCoder()))); getBadRecordErrorHandler() - .addErrorCollection(badRecords.setCoder(BadRecord.getCoder(input.getPipeline()))); + .addErrorCollection(badRecords.setCoder(BadRecord.getCoder(preParse.getPipeline()))); } else { // Write out failures to the provided dead-letter topic. result @@ -1215,7 +1230,31 @@ public PCollection expand(PBegin input) { .withClientFactory(getPubsubClientFactory())); } } - + // report Lineage once + preParse + .getPipeline() + .apply(Impulse.create()) + .apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void process() { + if (topicPath != null) { + TopicPath topic = topicPath.get(); + if (topic != null) { + Lineage.getSources() + .add("pubsub", "topic", topic.getDataCatalogSegments()); + } + } + if (subscriptionPath != null) { + SubscriptionPath sub = subscriptionPath.get(); + if (sub != null) { + Lineage.getSources() + .add("pubsub", "subscription", sub.getDataCatalogSegments()); + } + } + } + })); return read.setCoder(getCoder()); } @@ -1623,10 +1662,6 @@ public void finishBundle() throws IOException { for (Map.Entry entry : output.entrySet()) { publish(entry.getKey(), entry.getValue().messages); } - // Report lineage for all topics seen - for (PubsubTopic topic : output.keySet()) { - Lineage.getSinks().add("pubsub", "topic", topic.dataCatalogSegments()); - } output = null; pubsubClient.close(); pubsubClient = null; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 38d77aa3aac3..aa8e3a411486 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -41,7 +41,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.options.ValueProvider; @@ -232,9 +231,6 @@ private static class WriterFn extends DoFn, Void> { /** Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */ private transient @Nullable PubsubClient pubsubClient; - /** Last TopicPath that reported Lineage. */ - private transient @Nullable TopicPath reportedLineage; - private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches"); private final Counter elementCounter = SinkMetrics.elementsWritten(); private final Counter byteCounter = SinkMetrics.bytesWritten(); @@ -294,14 +290,6 @@ private void publishBatch(List messages, int bytes) throws IOEx batchCounter.inc(); elementCounter.inc(messages.size()); byteCounter.inc(bytes); - // Report Lineage multiple once for same topic - if (!topicPath.equals(reportedLineage)) { - List segments = topicPath.getDataCatalogSegments(); - if (segments.size() != 0) { - Lineage.getSinks().add("pubsub", "topic", segments); - } - reportedLineage = topicPath; - } } @StartBundle diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 95fa5c223419..b9a554d54ade 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -56,7 +56,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -1042,19 +1041,6 @@ public List split(int desiredNumSplits, PipelineOptions options) splitSource = new PubsubSource( outer, StaticValueProvider.of(outer.createRandomSubscription(options))); - TopicPath topic = outer.getTopic(); - if (topic != null) { - // is initial split on Read.fromTopic, report Lineage based on topic - Lineage.getSources().add("pubsub", "source", topic.getDataCatalogSegments()); - } - } else { - if (subscriptionPath.equals(outer.getSubscriptionProvider())) { - SubscriptionPath sub = subscriptionPath.get(); - if (sub != null) { - // is a split on Read.fromSubscription - Lineage.getSources().add("pubsub", "subscription", sub.getDataCatalogSegments()); - } - } } for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) { // Since the source is immutable and Pubsub automatically shards we simply