From e14f81d88c5c356454db5cfa484e77ef71c2a98a Mon Sep 17 00:00:00 2001 From: Minbo Bae Date: Sun, 11 Dec 2022 19:02:26 -0800 Subject: [PATCH] Close Connection to prevent gRPC leak error messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Reader.close(): fixes #2658 where original fix #2782 was rolled back by #2871 and #2873. As we don’t use “session” here anymore, I believe we should close “connection” here. * AbstractCloudBigtableTableDoFn.tearDown(): If an exception happened in ProcessElement, FinishBundle is not executed. We should clean up resources in TearDown. You can see the similar code with BigQueryIO https://github.com/apache/beam/pull/14949. --- .../beam/AbstractCloudBigtableTableDoFn.java | 14 ++++++++++++++ .../cloud/bigtable/beam/CloudBigtableIO.java | 13 +++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/AbstractCloudBigtableTableDoFn.java b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/AbstractCloudBigtableTableDoFn.java index 5e9a19953a..6e00edc23a 100755 --- a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/AbstractCloudBigtableTableDoFn.java +++ b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/AbstractCloudBigtableTableDoFn.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.beam; import com.google.cloud.bigtable.hbase.BigtableConfiguration; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -125,4 +126,17 @@ public void populateDisplayData(DisplayData.Builder builder) { public CloudBigtableConfiguration getConfig() { return config; } + + @Teardown + public void tearDown() { + // Close connection when DoFn is aborted. + if (connection != null) { + try { + connection.close(); + } catch (IOException e) { + DOFN_LOG.info("Ignore an exception encountered while closing the connection", e); + } + connection = null; + } + } } diff --git a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java index 3b46d75753..609d7d8f5b 100755 --- a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java +++ b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java @@ -685,11 +685,20 @@ public ByteKeyRangeTracker getRangeTracker() { /** Closes the {@link ResultScanner}, {@link Table}, and {@link Connection}. */ @Override - public void close() throws IOException { + public void close() { if (scanner != null) { scanner.close(); scanner = null; } + + if (connection != null) { + try { + connection.close(); + } catch (IOException ignored) { + } + connection = null; + } + long totalOps = getRowsReadCount(); long elapsedTimeMs = System.currentTimeMillis() - workStart; long operationsPerSecond = elapsedTimeMs == 0 ? 0 : (totalOps * 1000 / elapsedTimeMs); @@ -809,7 +818,7 @@ public void processElement(ProcessContext context) throws Exception { mutationsCounter.inc(); } - /** Closes the {@link BufferedMutator} and {@link Connection}. */ + /** Closes the {@link BufferedMutator}. */ @FinishBundle public synchronized void finishBundle(@SuppressWarnings("unused") FinishBundleContext context) throws Exception {