Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** An (abstract) helper class for talking to Pubsub via an underlying transport. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public abstract class PubsubClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(PubsubClient.class);
private static final Map<String, SerializableFunction<String, Schema>>
schemaTypeToConversionFnMap =
ImmutableMap.of(
Expand Down Expand Up @@ -257,6 +260,10 @@ public String getFullPath() {
return String.format("/subscriptions/%s/%s", projectId, subscriptionName);
}

public String getDataCatalogName() {
return String.format("pubsub:subscription:%s.%s", projectId, subscriptionName);
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
Expand Down Expand Up @@ -293,6 +300,7 @@ public static SubscriptionPath subscriptionPathFromName(

/** Path representing a Pubsub topic. */
public static class TopicPath implements Serializable {
// Format: "projects/<project>/topics/<topic>"
private final String path;

TopicPath(String path) {
Expand All @@ -310,6 +318,26 @@ public String getName() {
return splits.get(3);
}

/**
* Returns the data catalog name. Format "pubsub:topic:`project`.`topic`" This method is
* fail-safe. If topic path is malformed, it returns an empty string.
*/
public String getDataCatalogName() {
List<String> splits = Splitter.on('/').splitToList(path);
if (splits.size() == 4) {
// well-formed path
return String.format("pubsub:topic:%s.%s", splits.get(1), splits.get(3));
} else {
// Mal-formed path. It is either a test fixture or user error and will fail on publish.
// We do not throw exception instead return empty string here.
LOG.warn(
"Cannot get data catalog name for malformed topic path {}. Expected format: "
+ "projects/<project>/topics/<topic>",
path);
return "";
}
}

public String getFullPath() {
List<String> splits = Splitter.on('/').splitToList(path);
checkState(splits.size() == 4, "Malformed topic path %s", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
Expand Down Expand Up @@ -512,6 +513,10 @@ public String asPath() {
}
}

public String dataCatalogName() {
return String.format("pubsub:topic:%s.%s", project, topic);
}

@Override
public String toString() {
return asPath();
Expand Down Expand Up @@ -1617,6 +1622,10 @@ public void finishBundle() throws IOException {
for (Map.Entry<PubsubTopic, OutgoingData> entry : output.entrySet()) {
publish(entry.getKey(), entry.getValue().messages);
}
// Report lineage for all topics seen
for (PubsubTopic topic : output.keySet()) {
Lineage.getSinks().add(topic.dataCatalogName());
}
output = null;
pubsubClient.close();
pubsubClient = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
Expand Down Expand Up @@ -231,6 +233,9 @@ private static class WriterFn extends DoFn<Iterable<OutgoingMessage>, Void> {
/** Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */
private transient @Nullable PubsubClient pubsubClient;

/** Last TopicPath that reported Lineage. */
private transient @Nullable TopicPath reportedLineage;

private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches");
private final Counter elementCounter = SinkMetrics.elementsWritten();
private final Counter byteCounter = SinkMetrics.bytesWritten();
Expand Down Expand Up @@ -290,6 +295,14 @@ private void publishBatch(List<OutgoingMessage> messages, int bytes) throws IOEx
batchCounter.inc();
elementCounter.inc(messages.size());
byteCounter.inc(bytes);
// Report Lineage multiple once for same topic
if (!topicPath.equals(reportedLineage)) {
String name = topicPath.getDataCatalogName();
if (!Strings.isNullOrEmpty(name)) {
Lineage.getSinks().add(topicPath.getDataCatalogName());
}
reportedLineage = topicPath;
}
}

@StartBundle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -1041,6 +1042,19 @@ public List<PubsubSource> split(int desiredNumSplits, PipelineOptions options)
splitSource =
new PubsubSource(
outer, StaticValueProvider.of(outer.createRandomSubscription(options)));
TopicPath topic = outer.getTopic();
if (topic != null) {
// is initial split on Read.fromTopic, report Lineage based on topic
Lineage.getSources().add(topic.getDataCatalogName());
}
} else {
if (subscriptionPath.equals(outer.getSubscriptionProvider())) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
// is a split on Read.fromSubscription
Lineage.getSources().add(sub.getDataCatalogName());
}
}
}
for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
// Since the source is immutable and Pubsub automatically shards we simply
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,15 @@ public void subscriptionPathFromNameWellFormed() {
SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something");
assertEquals("projects/test/subscriptions/something", path.getPath());
assertEquals("/subscriptions/test/something", path.getFullPath());
assertEquals("pubsub:subscription:test.something", path.getDataCatalogName());
}

@Test
public void topicPathFromNameWellFormed() {
TopicPath path = PubsubClient.topicPathFromName("test", "something");
assertEquals("projects/test/topics/something", path.getPath());
assertEquals("/topics/test/something", path.getFullPath());
assertEquals("pubsub:topic:test.something", path.getDataCatalogName());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ public void testValueProviderTopic() {
assertThat(pubsubRead.getTopicProvider(), not(nullValue()));
assertThat(pubsubRead.getTopicProvider().isAccessible(), is(true));
assertThat(pubsubRead.getTopicProvider().get().asPath(), equalTo(provider.get()));
assertThat(
pubsubRead.getTopicProvider().get().dataCatalogName(),
equalTo("pubsub:topic:project.topic"));
}

@Test
Expand Down