From c008dd2f4eac82b5a01426f3cd61a63fb48d2b3a Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 2 Aug 2024 14:42:44 -0400 Subject: [PATCH 1/4] Add Lineage metrics for BigtableIO --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 14 ++++++++++++ .../sdk/io/gcp/bigtable/BigtableService.java | 6 +++++ .../io/gcp/bigtable/BigtableServiceImpl.java | 22 +++++++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index d78ae2cb6c57..6d20109e947b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -1337,6 +1337,7 @@ private static class BigtableWriterFn private transient Set> badRecords = null; // Due to callback thread not supporting Beam metrics, Record pending metrics and report later. private transient long pendingThrottlingMsecs; + private transient boolean reportedLineage; // Assign serviceEntry in startBundle and clear it in tearDown. @Nullable private BigtableServiceEntry serviceEntry; @@ -1480,6 +1481,10 @@ public void finishBundle(FinishBundleContext c) throws Exception { throttlingMsecs.inc(excessTime); } } + if (!reportedLineage) { + bigtableWriter.reportLineage(); + reportedLineage = true; + } bigtableWriter = null; } @@ -1612,6 +1617,7 @@ public String toString() { private final BigtableConfig config; private final BigtableReadOptions readOptions; private @Nullable Long estimatedSizeBytes; + private transient boolean reportedLineage; private final BigtableServiceFactory.ConfigId configId; @@ -1989,6 +1995,13 @@ public List getRanges() { public ValueProvider getTableId() { return readOptions.getTableId(); } + + void reportLineageOnce(BigtableService.Reader reader) { + if (!reportedLineage) { + reader.reportLineage(); + reportedLineage = true; + } + } } private static class BigtableReader extends BoundedReader { @@ -2019,6 +2032,7 @@ true, makeByteKey(reader.getCurrentRow().getKey()))) || rangeTracker.markDone(); if (hasRecord) { ++recordsReturned; + source.reportLineageOnce(reader); } return hasRecord; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java index 261cc3ac081d..50d8126999c4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java @@ -57,6 +57,9 @@ CompletionStage writeRecord(KV * @throws IOException if there is an error closing the writer */ void close() throws IOException; + + /** Report Lineage metrics to runner. */ + default void reportLineage() {} } /** The interface of a class that reads from Cloud Bigtable. */ @@ -77,6 +80,9 @@ interface Reader { Row getCurrentRow() throws NoSuchElementException; void close(); + + /** Report Lineage metrics to runner. */ + default void reportLineage() {} } /** Returns a {@link Reader} that will read from the specified source. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index f06a4a127686..32aa04ee7c9c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -71,6 +71,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -212,6 +213,11 @@ public void close() { exhausted = true; } } + + @Override + public void reportLineage() { + Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + } } @VisibleForTesting @@ -225,6 +231,9 @@ static class BigtableSegmentReaderImpl implements Reader { private final int refillSegmentWaterMark; private final long maxSegmentByteSize; private ServiceCallMetric serviceCallMetric; + private final String projectId; + private final String instanceId; + private final String tableId; private static class UpstreamResults { private final List rows; @@ -308,11 +317,19 @@ static BigtableSegmentReaderImpl create( // Asynchronously refill buffer when there is 10% of the elements are left this.refillSegmentWaterMark = Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE)); + this.projectId = projectId; + this.instanceId = instanceId; + this.tableId = tableId; } @Override public void close() {} + @Override + public void reportLineage() { + Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + } + @Override public boolean start() throws IOException { future = fetchNextSegment(); @@ -578,6 +595,11 @@ public void writeSingleRecord(KV> record) throws } } + @Override + public void reportLineage() { + Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + } + private ServiceCallMetric createServiceCallMetric() { // Populate metrics HashMap baseLabels = new HashMap<>(); From 202db03b32c64f025aa4e178bdbd86a8ef477449 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 2 Aug 2024 19:07:07 -0400 Subject: [PATCH 2/4] add tests --- .../org/apache/beam/sdk/metrics/Lineage.java | 33 ++++++++++++++--- .../io/gcp/bigtable/BigtableServiceImpl.java | 4 +-- .../io/gcp/bigquery/BigQueryIOReadTest.java | 35 ++++++++----------- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 17 +++------ .../sdk/io/gcp/bigtable/BigtableReadIT.java | 23 ++++++++++-- .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 21 +++++++++-- 6 files changed, 90 insertions(+), 43 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 7890a9f74b94..0ad0cdb9981b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -23,11 +23,8 @@ public class Lineage { public static final String LINEAGE_NAMESPACE = "lineage"; - public static final String SOURCE_METRIC_NAME = "sources"; - public static final String SINK_METRIC_NAME = "sinks"; - - private static final StringSet SOURCES = Metrics.stringSet(LINEAGE_NAMESPACE, SOURCE_METRIC_NAME); - private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, SINK_METRIC_NAME); + private static final StringSet SOURCES = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString()); + private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SINK.toString()); /** {@link StringSet} representing sources and optionally side inputs. */ public static StringSet getSources() { @@ -38,4 +35,30 @@ public static StringSet getSources() { public static StringSet getSinks() { return SINKS; } + + /** Query {@link StringSet} metrics from {@link MetricResults} */ + public static Iterable> query(MetricResults results, Type type) { + MetricsFilter filter = MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(LINEAGE_NAMESPACE, type.toString())).build(); + return results.queryMetrics(filter).getStringSets(); + } + + /** + * Lineage metrics resource types. + */ + public enum Type { + SOURCE("source"), + SINK("sink"); + + private final String name; + + Type(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 32aa04ee7c9c..6fdf67722bac 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -216,7 +216,7 @@ public void close() { @Override public void reportLineage() { - Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); } } @@ -327,7 +327,7 @@ public void close() {} @Override public void reportLineage() { - Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java index 5c43666e79e5..5e3e134d6dcb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecordBase; @@ -61,9 +62,8 @@ import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; @@ -350,19 +350,15 @@ private void checkTypedReadQueryObjectWithValidate( assertEquals(validate, read.getValidate()); } - private void checkLineageSourceMetric(PipelineResult pipelineResult, String tableName) { - MetricQueryResults lineageMetrics = - pipelineResult - .metrics() - .queryMetrics( - MetricsFilter.builder() - .addNameFilter( - MetricNameFilter.named( - Lineage.LINEAGE_NAMESPACE, Lineage.SOURCE_METRIC_NAME)) - .build()); - assertThat( - lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(), - contains("bigquery:" + tableName.replace(':', '.'))); + private void checkLineageSourceMetric( + PipelineResult pipelineResult, String tableName, boolean supportCommitted) { + MetricResult lineageMetrics = + Lineage.query(pipelineResult.metrics(), Lineage.Type.SOURCE).iterator().next(); + Set result = + supportCommitted + ? lineageMetrics.getCommitted().getStringSet() + : lineageMetrics.getAttempted().getStringSet(); + assertThat(result, contains("bigquery:" + tableName.replace(':', '.'))); } @Before @@ -600,10 +596,9 @@ public void processElement(ProcessContext c) throws Exception { new MyData("b", 2L, bd1, bd2), new MyData("c", 3L, bd1, bd2))); PipelineResult result = p.run(); - // Skip when direct runner splits outside of a counters context. - if (useTemplateCompatibility) { - checkLineageSourceMetric(result, "non-executing-project:somedataset.sometable"); - } + // only check attempted metrics when direct runner splits outside of a counters context. + checkLineageSourceMetric( + result, "non-executing-project:somedataset.sometable", useTemplateCompatibility); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index bc90d4c8bae7..03cdff51529d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -118,9 +118,8 @@ import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.JavaFieldSchema; @@ -285,16 +284,10 @@ public void evaluate() throws Throwable { .withJobService(fakeJobService); private void checkLineageSinkMetric(PipelineResult pipelineResult, String tableName) { - MetricQueryResults lineageMetrics = - pipelineResult - .metrics() - .queryMetrics( - MetricsFilter.builder() - .addNameFilter( - MetricNameFilter.named(Lineage.LINEAGE_NAMESPACE, Lineage.SINK_METRIC_NAME)) - .build()); + MetricResult lineageMetrics = + Lineage.query(pipelineResult.metrics(), Lineage.Type.SINK).iterator().next(); assertThat( - lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(), + lineageMetrics.getCommitted().getStringSet(), hasItem("bigquery:" + tableName.replace(':', '.'))); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java index bc88858ebc33..bbf66b5ec6bd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.io.gcp.bigtable; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; + import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; @@ -28,7 +31,10 @@ import java.util.Date; import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -110,7 +116,8 @@ public void testE2EBigtableRead() { p.apply(BigtableIO.read().withBigtableOptions(bigtableOptionsBuilder).withTableId(tableId)) .apply(Count.globally()); PAssert.thatSingleton(count).isEqualTo(numRows); - p.run(); + PipelineResult r = p.run(); + checkLineageSourceMetric(r, tableId); } @Test @@ -138,6 +145,18 @@ public void testE2EBigtableSegmentRead() { .withMaxBufferElementCount(10)) .apply(Count.globally()); PAssert.thatSingleton(count).isEqualTo(numRows); - p.run(); + PipelineResult r = p.run(); + checkLineageSourceMetric(r, tableId); + } + + private void checkLineageSourceMetric(PipelineResult r, String tableId) { + // Only check lineage metrics on direct runner until Dataflow runner v2 supported report back + if (options.getRunner().getName().contains("DirectRunner")) { + StringSetResult lineageMetrics = + Lineage.query(r.metrics(), Lineage.Type.SOURCE).iterator().next().getCommitted(); + assertThat( + lineageMetrics.getStringSet(), + hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId))); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index bf9f7d991fa2..e89f4f8c89e5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigtable; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import com.google.api.gax.rpc.ServerStream; @@ -39,8 +40,11 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.testing.PAssert; @@ -142,7 +146,7 @@ public void processElement(ProcessContext c) { .withProjectId(project) .withInstanceId(options.getInstanceId()) .withTableId(tableId)); - p.run(); + PipelineResult r = p.run(); // Test number of column families and column family name equality Table table = getTable(tableId); @@ -154,6 +158,7 @@ public void processElement(ProcessContext c) { // Test table data equality List> tableData = getTableData(tableId); assertThat(tableData, Matchers.containsInAnyOrder(testData.toArray())); + checkLineageSinkMetric(r, tableId); } @Test @@ -340,7 +345,7 @@ public void failureTest(int numRows, DoFn> tableData = getTableData(tableId); assertEquals(998, tableData.size()); + checkLineageSinkMetric(r, tableId); } @After @@ -412,4 +418,15 @@ private void deleteTable(String tableId) { tableAdminClient.deleteTable(tableId); } } + + private void checkLineageSinkMetric(PipelineResult r, String tableId) { + // Only check lineage metrics on direct runner until Dataflow runner v2 supported report back + if (options.getRunner().getName().contains("DirectRunner")) { + StringSetResult lineageMetrics = + Lineage.query(r.metrics(), Lineage.Type.SINK).iterator().next().getCommitted(); + assertThat( + lineageMetrics.getStringSet(), + hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId))); + } + } } From b5df20184c4b71486dc4fb13daef6db053c37add Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 2 Aug 2024 22:03:03 -0400 Subject: [PATCH 3/4] simplify metrics query logics; exclude test actually already failing --- .../org/apache/beam/sdk/metrics/Lineage.java | 28 ++++++++++++------- .../io/google-cloud-platform/build.gradle | 4 +++ .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 ++ .../io/gcp/bigquery/BigQueryIOReadTest.java | 16 ++--------- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 6 +--- .../sdk/io/gcp/bigtable/BigtableReadIT.java | 8 ++---- .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 5 +--- 7 files changed, 32 insertions(+), 37 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 0ad0cdb9981b..8b69b0ef5523 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -17,13 +17,16 @@ */ package org.apache.beam.sdk.metrics; +import java.util.HashSet; +import java.util.Set; + /** * Standard collection of metrics used to record source and sinks information for lineage tracking. */ public class Lineage { - public static final String LINEAGE_NAMESPACE = "lineage"; - private static final StringSet SOURCES = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString()); + private static final StringSet SOURCES = + Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString()); private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SINK.toString()); /** {@link StringSet} representing sources and optionally side inputs. */ @@ -36,16 +39,21 @@ public static StringSet getSinks() { return SINKS; } - /** Query {@link StringSet} metrics from {@link MetricResults} */ - public static Iterable> query(MetricResults results, Type type) { - MetricsFilter filter = MetricsFilter.builder() - .addNameFilter(MetricNameFilter.named(LINEAGE_NAMESPACE, type.toString())).build(); - return results.queryMetrics(filter).getStringSets(); + /** Query {@link StringSet} metrics from {@link MetricResults}. */ + public static Set query(MetricResults results, Type type) { + MetricsFilter filter = + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(LINEAGE_NAMESPACE, type.toString())) + .build(); + Set result = new HashSet<>(); + for (MetricResult metrics : results.queryMetrics(filter).getStringSets()) { + result.addAll(metrics.getCommitted().getStringSet()); + result.addAll(metrics.getAttempted().getStringSet()); + } + return result; } - /** - * Lineage metrics resource types. - */ + /** Lineage metrics resource types. */ public enum Type { SOURCE("source"), SINK("sink"); diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index e499bae6fc64..23c56f13a94c 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -218,6 +218,10 @@ task integrationTest(type: Test, dependsOn: processTestResources) { useJUnit { excludeCategories "org.apache.beam.sdk.testing.UsesKms" + filter { + // https://github.com/apache/beam/issues/32071 + excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableReadIT.testE2EBigtableSegmentRead' + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 6d20109e947b..60fa510f8d38 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -2030,7 +2030,9 @@ public boolean start() throws IOException { && rangeTracker.tryReturnRecordAt( true, makeByteKey(reader.getCurrentRow().getKey()))) || rangeTracker.markDone(); + LOG.warn("called start"); if (hasRecord) { + LOG.warn("called has record"); ++recordsReturned; source.reportLineageOnce(reader); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java index 5e3e134d6dcb..a8aca7570b33 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java @@ -62,8 +62,6 @@ import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; @@ -350,14 +348,8 @@ private void checkTypedReadQueryObjectWithValidate( assertEquals(validate, read.getValidate()); } - private void checkLineageSourceMetric( - PipelineResult pipelineResult, String tableName, boolean supportCommitted) { - MetricResult lineageMetrics = - Lineage.query(pipelineResult.metrics(), Lineage.Type.SOURCE).iterator().next(); - Set result = - supportCommitted - ? lineageMetrics.getCommitted().getStringSet() - : lineageMetrics.getAttempted().getStringSet(); + private void checkLineageSourceMetric(PipelineResult pipelineResult, String tableName) { + Set result = Lineage.query(pipelineResult.metrics(), Lineage.Type.SOURCE); assertThat(result, contains("bigquery:" + tableName.replace(':', '.'))); } @@ -596,9 +588,7 @@ public void processElement(ProcessContext c) throws Exception { new MyData("b", 2L, bd1, bd2), new MyData("c", 3L, bd1, bd2))); PipelineResult result = p.run(); - // only check attempted metrics when direct runner splits outside of a counters context. - checkLineageSourceMetric( - result, "non-executing-project:somedataset.sometable", useTemplateCompatibility); + checkLineageSourceMetric(result, "non-executing-project:somedataset.sometable"); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 03cdff51529d..c5af8045bfe2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -118,8 +118,6 @@ import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.JavaFieldSchema; @@ -284,10 +282,8 @@ public void evaluate() throws Throwable { .withJobService(fakeJobService); private void checkLineageSinkMetric(PipelineResult pipelineResult, String tableName) { - MetricResult lineageMetrics = - Lineage.query(pipelineResult.metrics(), Lineage.Type.SINK).iterator().next(); assertThat( - lineageMetrics.getCommitted().getStringSet(), + Lineage.query(pipelineResult.metrics(), Lineage.Type.SINK), hasItem("bigquery:" + tableName.replace(':', '.'))); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java index bbf66b5ec6bd..80f910a76718 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -150,12 +149,11 @@ public void testE2EBigtableSegmentRead() { } private void checkLineageSourceMetric(PipelineResult r, String tableId) { - // Only check lineage metrics on direct runner until Dataflow runner v2 supported report back + // TODO(https://github.com/apache/beam/issues/32071) test malformed, + // when pipeline.run() os non-blocking, the metrics are not available by the time of query if (options.getRunner().getName().contains("DirectRunner")) { - StringSetResult lineageMetrics = - Lineage.query(r.metrics(), Lineage.Type.SOURCE).iterator().next().getCommitted(); assertThat( - lineageMetrics.getStringSet(), + Lineage.query(r.metrics(), Lineage.Type.SOURCE), hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId))); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index e89f4f8c89e5..46bb3df836e5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -44,7 +44,6 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.testing.PAssert; @@ -422,10 +421,8 @@ private void deleteTable(String tableId) { private void checkLineageSinkMetric(PipelineResult r, String tableId) { // Only check lineage metrics on direct runner until Dataflow runner v2 supported report back if (options.getRunner().getName().contains("DirectRunner")) { - StringSetResult lineageMetrics = - Lineage.query(r.metrics(), Lineage.Type.SINK).iterator().next().getCommitted(); assertThat( - lineageMetrics.getStringSet(), + Lineage.query(r.metrics(), Lineage.Type.SINK), hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId))); } } From 8c050c05307dbce5a7359f979cc7031eb0c3e12b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 6 Aug 2024 13:12:15 -0400 Subject: [PATCH 4/4] Address comments, fix typo --- .../java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 -- .../org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 60fa510f8d38..6d20109e947b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -2030,9 +2030,7 @@ public boolean start() throws IOException { && rangeTracker.tryReturnRecordAt( true, makeByteKey(reader.getCurrentRow().getKey()))) || rangeTracker.markDone(); - LOG.warn("called start"); if (hasRecord) { - LOG.warn("called has record"); ++recordsReturned; source.reportLineageOnce(reader); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java index 80f910a76718..4ce9ad10b2c0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java @@ -150,7 +150,7 @@ public void testE2EBigtableSegmentRead() { private void checkLineageSourceMetric(PipelineResult r, String tableId) { // TODO(https://github.com/apache/beam/issues/32071) test malformed, - // when pipeline.run() os non-blocking, the metrics are not available by the time of query + // when pipeline.run() is non-blocking, the metrics are not available by the time of query if (options.getRunner().getName().contains("DirectRunner")) { assertThat( Lineage.query(r.metrics(), Lineage.Type.SOURCE),