From 878cf7bd206d1fa631b7d60165d24fc6853a0ea5 Mon Sep 17 00:00:00 2001 From: reuvenlax Date: Wed, 9 Jun 2021 08:49:26 -0700 Subject: [PATCH] Merge pull request #14949: [BEAM-12356] Cache and shutdown BigQuery services --- .../gcp/bigquery/BatchedStreamingWrite.java | 27 +++++++++++++- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 15 +++++--- .../sdk/io/gcp/bigquery/BigQueryServices.java | 4 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 10 +++++ .../io/gcp/bigquery/CreateTableHelpers.java | 5 +-- .../bigquery/DynamicDestinationsHelpers.java | 11 ++++-- .../StorageApiFinalizeWritesDoFn.java | 12 ++++++ .../StorageApiFlushAndFinalizeDoFn.java | 12 ++++++ .../StorageApiWriteUnshardedRecords.java | 22 +++++++---- .../StorageApiWritesShardedRecords.java | 31 +++++++++++----- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 26 ++++++++++++- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 37 ++++++++++++++++++- .../io/gcp/testing/FakeDatasetService.java | 3 ++ .../sdk/io/gcp/testing/FakeJobService.java | 3 ++ 14 files changed, 183 insertions(+), 35 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java index 04d833706639..120d76f7fcc7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java @@ -25,17 +25,21 @@ import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; import org.apache.beam.runners.core.metrics.MetricsLogger; import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.metrics.SinkMetrics; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Teardown; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -81,6 +85,7 @@ class BatchedStreamingWrite private final SerializableFunction toTableRow; private final SerializableFunction toFailsafeTableRow; private final Set allowedMetricUrns; + private @Nullable DatasetService datasetService; /** Tracks bytes written, exposed as "ByteCount" Counter. */ private Counter byteCounter = SinkMetrics.bytesWritten(); @@ -344,6 +349,18 @@ public void processElement( } } + @Teardown + public void onTeardown() { + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + /** Writes the accumulated rows into BigQuery with streaming API. */ private void flushRows( TableReference tableReference, @@ -355,8 +372,7 @@ private void flushRows( if (!tableRows.isEmpty()) { try { long totalBytes = - bqServices - .getDatasetService(options) + getDatasetService(options) .insertAll( tableReference, tableRows, @@ -374,6 +390,13 @@ private void flushRows( } } + private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { + if (datasetService == null) { + datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + return datasetService; + } + private void reportStreamingApiLogging(BigQueryOptions options) { MetricsContainer processWideContainer = MetricsEnvironment.getProcessWideContainer(); if (processWideContainer instanceof MetricsLogger) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 13d84bb109ae..5fcb0543a2bf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -540,12 +540,17 @@ static void verifyDatasetPresence(DatasetService datasetService, TableReference public static @Nullable BigInteger getNumRows(BigQueryOptions options, TableReference tableRef) throws InterruptedException, IOException { - DatasetService datasetService = new BigQueryServicesImpl().getDatasetService(options); - Table table = datasetService.getTable(tableRef); - if (table == null) { - return null; + try (DatasetService datasetService = new BigQueryServicesImpl().getDatasetService(options)) { + Table table = datasetService.getTable(tableRef); + if (table == null) { + return null; + } + return table.getNumRows(); + } catch (IOException | InterruptedException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); } - return table.getNumRows(); } static String getDatasetLocation( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index efc0fa981d42..c0b945ccff35 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -62,7 +62,7 @@ public interface BigQueryServices extends Serializable { StorageClient getStorageClient(BigQueryOptions bqOptions) throws IOException; /** An interface for the Cloud BigQuery load service. */ - public interface JobService { + public interface JobService extends AutoCloseable { /** Start a BigQuery load job. */ void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException; @@ -98,7 +98,7 @@ JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig, S } /** An interface to get, create and delete Cloud BigQuery datasets and tables. */ - public interface DatasetService { + public interface DatasetService extends AutoCloseable { /** * Gets the specified {@link Table} resource by table ID. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 290dca235fdc..fd32a84db313 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -442,6 +442,9 @@ public Job getJob(JobReference jobRef, Sleeper sleeper, BackOff backoff) jobRef, MAX_RPC_RETRIES), lastException); } + + @Override + public void close() throws Exception {} } @VisibleForTesting @@ -1173,6 +1176,13 @@ public ApiFuture commitWriteStreams( .addAllWriteStreams(writeStreamNames) .build()); } + + @Override + public void close() throws Exception { + this.newWriteClient.shutdownNow(); + this.newWriteClient.awaitTermination(60, TimeUnit.SECONDS); + this.newWriteClient.close(); + } } static final SerializableFunction DONT_RETRY_NOT_FOUND = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java index 58848ec77775..11e6b329e0f9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java @@ -106,11 +106,10 @@ private static void tryCreateTable( String tableSpec, String kmsKey, BigQueryServices bqServices) { - DatasetService datasetService = - bqServices.getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)); TableReference tableReference = tableDestination.getTableReference().clone(); tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId())); - try { + try (DatasetService datasetService = + bqServices.getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class))) { if (datasetService.getTable(tableReference) == null) { TableSchema tableSchema = schemaSupplier.get(); checkArgument( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index 18af86ca04dc..819ee9c2710b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableSpec; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.DoFn; @@ -358,9 +359,13 @@ private Table getBigQueryTable(TableReference tableReference) { if (tableReference.getProjectId() == null) { tableReference.setProjectId(bqOptions.getProject()); } - return bqServices.getDatasetService(bqOptions).getTable(tableReference); - } catch (InterruptedException | IOException e) { - LOG.info("Failed to get BigQuery table " + tableReference); + try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) { + return datasetService.getTable(tableReference); + } catch (InterruptedException | IOException e) { + LOG.info("Failed to get BigQuery table " + tableReference); + } + } catch (Exception e) { + throw new RuntimeException(e); } } while (nextBackOff(Sleeper.DEFAULT, backoff)); } catch (InterruptedException e) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java index b3b820075dcf..f935a638fa03 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java @@ -76,6 +76,18 @@ private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws return datasetService; } + @Teardown + public void onTeardown() { + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @StartBundle public void startBundle() throws IOException { commitStreams = Maps.newHashMap(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java index a3676ba05322..4cf312e22c18 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java @@ -118,6 +118,18 @@ private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws return datasetService; } + @Teardown + public void onTeardown() { + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @SuppressWarnings({"nullness"}) @ProcessElement public void process(PipelineOptions pipelineOptions, @Element KV element) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8f15121fe64e..7ecc1fac1036 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -73,7 +73,6 @@ public class StorageApiWriteUnshardedRecords private final String kmsKey; private final BigQueryServices bqServices; private final Coder destinationCoder; - @Nullable private DatasetService datasetService = null; public StorageApiWriteUnshardedRecords( StorageApiDynamicDestinations dynamicDestinations, @@ -88,12 +87,6 @@ public StorageApiWriteUnshardedRecords( this.destinationCoder = destinationCoder; } - private void initializeDatasetService(PipelineOptions pipelineOptions) { - if (datasetService == null) { - datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); - } - } - @Override public PCollection expand(PCollection> input) { String operationName = input.getName() + "/" + getName(); @@ -244,11 +237,18 @@ void flush() throws Exception { private Map destinations = Maps.newHashMap(); private final TwoLevelMessageConverterCache messageConverters; + private @Nullable DatasetService datasetService; WriteRecordsDoFn(String operationName) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); } + private void initializeDatasetService(PipelineOptions pipelineOptions) { + if (datasetService == null) { + datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + } + @StartBundle public void startBundle() throws IOException { destinations = Maps.newHashMap(); @@ -318,6 +318,14 @@ public void teardown() { for (DestinationState state : destinations.values()) { state.close(); } + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 916a1bbdb529..52871845ad6f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -98,7 +98,6 @@ public class StorageApiWritesShardedRecords private final String kmsKey; private final BigQueryServices bqServices; private final Coder destinationCoder; - @Nullable private DatasetService datasetServiceInternal = null; private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); private static final Cache APPEND_CLIENTS = @@ -112,14 +111,6 @@ public class StorageApiWritesShardedRecords }) .build(); - private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { - if (datasetServiceInternal == null) { - datasetServiceInternal = - bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); - } - return datasetServiceInternal; - } - // Run a closure asynchronously, ignoring failures. private interface ThrowingRunnable { void run() throws Exception; @@ -257,6 +248,8 @@ class WriteRecordsDoFn private Map destinations = Maps.newHashMap(); + private @Nullable DatasetService datasetServiceInternal = null; + // Stores the current stream for this key. @StateId("streamName") private final StateSpec> streamNameSpec = StateSpecs.value(); @@ -295,6 +288,26 @@ String getOrCreateStream( return stream; } + private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { + if (datasetServiceInternal == null) { + datasetServiceInternal = + bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + return datasetServiceInternal; + } + + @Teardown + public void onTeardown() { + try { + if (datasetServiceInternal != null) { + datasetServiceInternal.close(); + datasetServiceInternal = null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @SuppressWarnings({"nullness"}) @ProcessElement public void process( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 07b9b34ebf33..a45f6f8db42b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -27,11 +27,13 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.PendingJobManager; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; @@ -62,6 +64,7 @@ class WriteRename extends DoFn>, Void> { private final CreateDisposition firstPaneCreateDisposition; private final int maxRetryJobs; private final String kmsKey; + private @Nullable DatasetService datasetService; private static class PendingJobData { final BigQueryHelpers.PendingJob retryJob; @@ -100,6 +103,18 @@ public void startBundle(StartBundleContext c) { pendingJobs.clear(); } + @Teardown + public void onTeardown() { + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @ProcessElement public void processElement(ProcessContext c) throws Exception { Multimap tempTables = ArrayListMultimap.create(); @@ -118,7 +133,7 @@ public void processElement(ProcessContext c) throws Exception { @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { DatasetService datasetService = - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); PendingJobManager jobManager = new PendingJobManager(); for (PendingJobData pendingJob : pendingJobs) { jobManager.addPendingJob( @@ -142,6 +157,13 @@ public void finishBundle(FinishBundleContext c) throws Exception { jobManager.waitForDone(); } + private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { + if (datasetService == null) { + datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + return datasetService; + } + private PendingJobData startWriteRename( TableDestination finalTableDestination, Iterable tempTableNames, ProcessContext c) throws Exception { @@ -163,7 +185,7 @@ private PendingJobData startWriteRename( BigQueryHelpers.PendingJob retryJob = startCopy( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, finalTableDestination.getTableReference(), tempTables, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index e3b7fc68d5ef..d9687a143379 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; @@ -108,6 +109,8 @@ class WriteTables private final @Nullable String kmsKey; private final String sourceFormat; private final boolean useAvroLogicalTypes; + private @Nullable DatasetService datasetService; + private @Nullable JobService jobService; private class WriteTablesDoFn extends DoFn, List>, KV> { @@ -219,8 +222,8 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except BigQueryHelpers.PendingJob retryJob = startLoad( - bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, tableReference, tableDestination.getTimePartitioning(), @@ -234,6 +237,36 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except new PendingJobData(window, retryJob, partitionFiles, tableDestination, tableReference)); } + @Teardown + public void onTeardown() { + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + if (jobService != null) { + jobService.close(); + jobService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { + if (datasetService == null) { + datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + return datasetService; + } + + private JobService getJobService(PipelineOptions pipelineOptions) throws IOException { + if (jobService == null) { + jobService = bqServices.getJobService(pipelineOptions.as(BigQueryOptions.class)); + } + return jobService; + } + @Override public void populateDisplayData(DisplayData.Builder builder) { builder.add( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index a716586279a3..6bfd2b839288 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -80,6 +80,9 @@ public class FakeDatasetService implements DatasetService, Serializable { static Map writeStreams; + @Override + public void close() throws Exception {} + static class Stream { final List stream; final TableContainer tableContainer; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java index 924a9962294c..e4c32f5a0cd5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java @@ -103,6 +103,9 @@ public class FakeJobService implements JobService, Serializable { private final FakeDatasetService datasetService; + @Override + public void close() throws Exception {} + private static class JobInfo { Job job; int getJobCount = 0;