diff --git a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/AbstractCloudBigtableTableDoFn.java b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/AbstractCloudBigtableTableDoFn.java index 5e9a19953a..6e00edc23a 100755 --- a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/AbstractCloudBigtableTableDoFn.java +++ b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/AbstractCloudBigtableTableDoFn.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.beam; import com.google.cloud.bigtable.hbase.BigtableConfiguration; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -125,4 +126,17 @@ public void populateDisplayData(DisplayData.Builder builder) { public CloudBigtableConfiguration getConfig() { return config; } + + @Teardown + public void tearDown() { + // Close connection when DoFn is aborted. + if (connection != null) { + try { + connection.close(); + } catch (IOException e) { + DOFN_LOG.info("Ignore an exception encountered while closing the connection", e); + } + connection = null; + } + } } diff --git a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java index 3b46d75753..609d7d8f5b 100755 --- a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java +++ b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java @@ -685,11 +685,20 @@ public ByteKeyRangeTracker getRangeTracker() { /** Closes the {@link ResultScanner}, {@link Table}, and {@link Connection}. */ @Override - public void close() throws IOException { + public void close() { if (scanner != null) { scanner.close(); scanner = null; } + + if (connection != null) { + try { + connection.close(); + } catch (IOException ignored) { + } + connection = null; + } + long totalOps = getRowsReadCount(); long elapsedTimeMs = System.currentTimeMillis() - workStart; long operationsPerSecond = elapsedTimeMs == 0 ? 0 : (totalOps * 1000 / elapsedTimeMs); @@ -809,7 +818,7 @@ public void processElement(ProcessContext context) throws Exception { mutationsCounter.inc(); } - /** Closes the {@link BufferedMutator} and {@link Connection}. */ + /** Closes the {@link BufferedMutator}. */ @FinishBundle public synchronized void finishBundle(@SuppressWarnings("unused") FinishBundleContext context) throws Exception {