Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ public static Set<String> query(MetricResults results, Type type) {
.build();
Set<String> result = new HashSet<>();
for (MetricResult<StringSetResult> metrics : results.queryMetrics(filter).getStringSets()) {
result.addAll(metrics.getCommitted().getStringSet());
try {
result.addAll(metrics.getCommitted().getStringSet());
} catch (UnsupportedOperationException unused) {
// MetricsResult.getCommitted throws this exception when runner support missing, just skip.
}
result.addAll(metrics.getAttempted().getStringSet());
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import javax.naming.SizeLimitExceededException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
Expand All @@ -43,6 +44,8 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>

private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction;
@Nullable SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction;
/** Last TopicPath that reported Lineage. */
private transient @Nullable String reportedLineage;

private final BadRecordRouter badRecordRouter;

Expand Down Expand Up @@ -165,6 +168,13 @@ public void process(
return;
}
}
String topic = message.getTopic();
// topic shouldn't be null, but lineage report is fail-safe
if (topic != null && !topic.equals(reportedLineage)) {
Lineage.getSinks()
.add("pubsub", "topic", PubsubClient.topicPathFromPath(topic).getDataCatalogSegments());
reportedLineage = topic;
}
try {
validatePubsubMessageSize(message, maxPublishBatchSize);
} catch (SizeLimitExceededException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -1148,6 +1149,20 @@ public PCollection<T> expand(PBegin input) {
getNeedsOrderingKey());

PCollection<PubsubMessage> preParse = input.apply(source);
return expandReadContinued(preParse, topicPath, subscriptionPath);
}

/**
* Runner agnostic part of the Expansion.
*
* <p>Common logics (MapElements, SDK metrics, DLQ, etc) live here as PubsubUnboundedSource is
* overridden on Dataflow runner.
*/
private PCollection<T> expandReadContinued(
PCollection<PubsubMessage> preParse,
@Nullable ValueProvider<TopicPath> topicPath,
@Nullable ValueProvider<SubscriptionPath> subscriptionPath) {

TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
PCollection<T> read;
if (getDeadLetterTopicProvider() == null
Expand All @@ -1174,7 +1189,7 @@ public PCollection<T> expand(PBegin input) {
"Map Failures To BadRecords",
ParDo.of(new ParseReadFailuresToBadRecords(preParse.getCoder())));
getBadRecordErrorHandler()
.addErrorCollection(badRecords.setCoder(BadRecord.getCoder(input.getPipeline())));
.addErrorCollection(badRecords.setCoder(BadRecord.getCoder(preParse.getPipeline())));
} else {
// Write out failures to the provided dead-letter topic.
result
Expand Down Expand Up @@ -1215,7 +1230,31 @@ public PCollection<T> expand(PBegin input) {
.withClientFactory(getPubsubClientFactory()));
}
}

// report Lineage once
preParse
.getPipeline()
.apply(Impulse.create())
.apply(
ParDo.of(
new DoFn<byte[], Void>() {
@ProcessElement
public void process() {
if (topicPath != null) {
TopicPath topic = topicPath.get();
if (topic != null) {
Lineage.getSources()
.add("pubsub", "topic", topic.getDataCatalogSegments());
}
}
if (subscriptionPath != null) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
Lineage.getSources()
.add("pubsub", "subscription", sub.getDataCatalogSegments());
}
}
}
}));
return read.setCoder(getCoder());
}

Expand Down Expand Up @@ -1623,10 +1662,6 @@ public void finishBundle() throws IOException {
for (Map.Entry<PubsubTopic, OutgoingData> entry : output.entrySet()) {
publish(entry.getKey(), entry.getValue().messages);
}
// Report lineage for all topics seen
for (PubsubTopic topic : output.keySet()) {
Lineage.getSinks().add("pubsub", "topic", topic.dataCatalogSegments());
}
output = null;
pubsubClient.close();
pubsubClient = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -232,9 +231,6 @@ private static class WriterFn extends DoFn<Iterable<OutgoingMessage>, Void> {
/** Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */
private transient @Nullable PubsubClient pubsubClient;

/** Last TopicPath that reported Lineage. */
private transient @Nullable TopicPath reportedLineage;

private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches");
private final Counter elementCounter = SinkMetrics.elementsWritten();
private final Counter byteCounter = SinkMetrics.bytesWritten();
Expand Down Expand Up @@ -294,14 +290,6 @@ private void publishBatch(List<OutgoingMessage> messages, int bytes) throws IOEx
batchCounter.inc();
elementCounter.inc(messages.size());
byteCounter.inc(bytes);
// Report Lineage multiple once for same topic
if (!topicPath.equals(reportedLineage)) {
List<String> segments = topicPath.getDataCatalogSegments();
if (segments.size() != 0) {
Lineage.getSinks().add("pubsub", "topic", segments);
}
reportedLineage = topicPath;
}
}

@StartBundle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -1042,19 +1041,6 @@ public List<PubsubSource> split(int desiredNumSplits, PipelineOptions options)
splitSource =
new PubsubSource(
outer, StaticValueProvider.of(outer.createRandomSubscription(options)));
TopicPath topic = outer.getTopic();
if (topic != null) {
// is initial split on Read.fromTopic, report Lineage based on topic
Lineage.getSources().add("pubsub", "source", topic.getDataCatalogSegments());
}
} else {
if (subscriptionPath.equals(outer.getSubscriptionProvider())) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
// is a split on Read.fromSubscription
Lineage.getSources().add("pubsub", "subscription", sub.getDataCatalogSegments());
}
}
}
for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
// Since the source is immutable and Pubsub automatically shards we simply
Expand Down