diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java index 484fe3bcd9ad..dfe797e28bf5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java @@ -86,7 +86,6 @@ class BatchedStreamingWrite private final SerializableFunction toTableRow; private final SerializableFunction toFailsafeTableRow; private final Set allowedMetricUrns; - private @Nullable DatasetService datasetService; /** Tracks bytes written, exposed as "ByteCount" Counter. */ private Counter byteCounter = SinkMetrics.bytesWritten(); @@ -222,6 +221,15 @@ private class BatchAndInsertElements extends DoFn> uniqueIdsForTableRows; + private transient @Nullable DatasetService datasetService; + + private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { + if (datasetService == null) { + datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + return datasetService; + } + /** Prepares a target BigQuery table. */ @StartBundle public void startBundle() { @@ -257,10 +265,10 @@ public void finishBundle(FinishBundleContext context) throws Exception { tableRows.entrySet()) { TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey()); flushRows( + getDatasetService(options), tableReference, entry.getValue(), uniqueIdsForTableRows.get(entry.getKey()), - options, failedInserts, successfulInserts); } @@ -272,6 +280,18 @@ public void finishBundle(FinishBundleContext context) throws Exception { } reportStreamingApiLogging(options); } + + @Teardown + public void onTeardown() { + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } } // The max duration input records are allowed to be buffered in the state, if using ViaStateful. @@ -325,13 +345,22 @@ public PCollectionTuple expand(PCollection>> i // shuffling. private class InsertBatchedElements extends DoFn, Iterable>>, Void> { + private transient @Nullable DatasetService datasetService; + + private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { + if (datasetService == null) { + datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + return datasetService; + } + @ProcessElement public void processElement( @Element KV, Iterable>> input, BoundedWindow window, ProcessContext context, MultiOutputReceiver out) - throws InterruptedException { + throws InterruptedException, IOException { List> tableRows = new ArrayList<>(); List uniqueIds = new ArrayList<>(); for (TableRowInfo row : input.getValue()) { @@ -347,7 +376,13 @@ public void processElement( TableReference tableReference = BigQueryHelpers.parseTableSpec(input.getKey().getKey()); List> failedInserts = Lists.newArrayList(); List> successfulInserts = Lists.newArrayList(); - flushRows(tableReference, tableRows, uniqueIds, options, failedInserts, successfulInserts); + flushRows( + getDatasetService(options), + tableReference, + tableRows, + uniqueIds, + failedInserts, + successfulInserts); for (ValueInSingleWindow row : failedInserts) { out.get(failedOutputTag).output(row.getValue()); @@ -357,44 +392,43 @@ public void processElement( } reportStreamingApiLogging(options); } - } - @Teardown - public void onTeardown() { - try { - if (datasetService != null) { - datasetService.close(); - datasetService = null; + @Teardown + public void onTeardown() { + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); } - } catch (Exception e) { - throw new RuntimeException(e); } } /** Writes the accumulated rows into BigQuery with streaming API. */ private void flushRows( + DatasetService datasetService, TableReference tableReference, List> tableRows, List uniqueIds, - BigQueryOptions options, List> failedInserts, List> successfulInserts) throws InterruptedException { if (!tableRows.isEmpty()) { try { long totalBytes = - getDatasetService(options) - .insertAll( - tableReference, - tableRows, - uniqueIds, - retryPolicy, - failedInserts, - errorContainer, - skipInvalidRows, - ignoreUnknownValues, - ignoreInsertIds, - successfulInserts); + datasetService.insertAll( + tableReference, + tableRows, + uniqueIds, + retryPolicy, + failedInserts, + errorContainer, + skipInvalidRows, + ignoreUnknownValues, + ignoreInsertIds, + successfulInserts); byteCounter.inc(totalBytes); } catch (IOException e) { throw new RuntimeException(e); @@ -402,13 +436,6 @@ private void flushRows( } } - private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { - if (datasetService == null) { - datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); - } - return datasetService; - } - private void reportStreamingApiLogging(BigQueryOptions options) { MetricsContainer processWideContainer = MetricsEnvironment.getProcessWideContainer(); if (processWideContainer instanceof MetricsLogger) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 1d3d89491c1b..8b9b705bdb04 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -965,49 +965,53 @@ public void validate(PipelineOptions options) { // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline. // For these cases the withoutValidation method can be used to disable the check. if (getValidate()) { - if (table != null) { - checkArgument(table.isAccessible(), "Cannot call validate if table is dynamically set."); - } - if (table != null && table.get().getProjectId() != null) { - // Check for source table presence for early failure notification. - DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); - BigQueryHelpers.verifyDatasetPresence(datasetService, table.get()); - BigQueryHelpers.verifyTablePresence(datasetService, table.get()); - } else if (getQuery() != null) { - checkArgument( - getQuery().isAccessible(), "Cannot call validate if query is dynamically set."); - JobService jobService = getBigQueryServices().getJobService(bqOptions); - try { - jobService.dryRunQuery( - bqOptions.getBigQueryProject() == null - ? bqOptions.getProject() - : bqOptions.getBigQueryProject(), - new JobConfigurationQuery() - .setQuery(getQuery().get()) - .setFlattenResults(getFlattenResults()) - .setUseLegacySql(getUseLegacySql()), - getQueryLocation()); - } catch (Exception e) { - throw new IllegalArgumentException( - String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e); + try (DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions)) { + if (table != null) { + checkArgument( + table.isAccessible(), "Cannot call validate if table is dynamically set."); } + if (table != null && table.get().getProjectId() != null) { + // Check for source table presence for early failure notification. + BigQueryHelpers.verifyDatasetPresence(datasetService, table.get()); + BigQueryHelpers.verifyTablePresence(datasetService, table.get()); + } else if (getQuery() != null) { + checkArgument( + getQuery().isAccessible(), "Cannot call validate if query is dynamically set."); + JobService jobService = getBigQueryServices().getJobService(bqOptions); + try { + jobService.dryRunQuery( + bqOptions.getBigQueryProject() == null + ? bqOptions.getProject() + : bqOptions.getBigQueryProject(), + new JobConfigurationQuery() + .setQuery(getQuery().get()) + .setFlattenResults(getFlattenResults()) + .setUseLegacySql(getUseLegacySql()), + getQueryLocation()); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e); + } - DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); - // If the user provided a temp dataset, check if the dataset exists before launching the - // query - if (getQueryTempDataset() != null) { - // The temp table is only used for dataset and project id validation, not for table name - // validation - TableReference tempTable = - new TableReference() - .setProjectId( - bqOptions.getBigQueryProject() == null - ? bqOptions.getProject() - : bqOptions.getBigQueryProject()) - .setDatasetId(getQueryTempDataset()) - .setTableId("dummy table"); - BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable); + // If the user provided a temp dataset, check if the dataset exists before launching the + // query + if (getQueryTempDataset() != null) { + // The temp table is only used for dataset and project id validation, not for table + // name + // validation + TableReference tempTable = + new TableReference() + .setProjectId( + bqOptions.getBigQueryProject() == null + ? bqOptions.getProject() + : bqOptions.getBigQueryProject()) + .setDatasetId(getQueryTempDataset()) + .setTableId("dummy table"); + BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable); + } } + } catch (Exception e) { + throw new RuntimeException(e); } } } @@ -1401,15 +1405,17 @@ void cleanup(ContextContainer c) throws Exception { options.getJobName(), jobUuid, JobType.QUERY), queryTempDataset); - DatasetService datasetService = getBigQueryServices().getDatasetService(options); - LOG.info("Deleting temporary table with query results {}", tempTable); - datasetService.deleteTable(tempTable); - // Delete dataset only if it was created by Beam - boolean datasetCreatedByBeam = !queryTempDataset.isPresent(); - if (datasetCreatedByBeam) { - LOG.info( - "Deleting temporary dataset with query results {}", tempTable.getDatasetId()); - datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId()); + try (DatasetService datasetService = + getBigQueryServices().getDatasetService(options)) { + LOG.info("Deleting temporary table with query results {}", tempTable); + datasetService.deleteTable(tempTable); + // Delete dataset only if it was created by Beam + boolean datasetCreatedByBeam = !queryTempDataset.isPresent(); + if (datasetCreatedByBeam) { + LOG.info( + "Deleting temporary dataset with query results {}", tempTable.getDatasetId()); + datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId()); + } } } }; @@ -2484,17 +2490,20 @@ public void validate(PipelineOptions pipelineOptions) { // The user specified a table. if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) { TableReference table = getTableWithDefaultProject(options).get(); - DatasetService datasetService = getBigQueryServices().getDatasetService(options); - // Check for destination table presence and emptiness for early failure notification. - // Note that a presence check can fail when the table or dataset is created by an earlier - // stage of the pipeline. For these cases the #withoutValidation method can be used to - // disable the check. - BigQueryHelpers.verifyDatasetPresence(datasetService, table); - if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) { - BigQueryHelpers.verifyTablePresence(datasetService, table); - } - if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) { - BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table); + try (DatasetService datasetService = getBigQueryServices().getDatasetService(options)) { + // Check for destination table presence and emptiness for early failure notification. + // Note that a presence check can fail when the table or dataset is created by an earlier + // stage of the pipeline. For these cases the #withoutValidation method can be used to + // disable the check. + BigQueryHelpers.verifyDatasetPresence(datasetService, table); + if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) { + BigQueryHelpers.verifyTablePresence(datasetService, table); + } + if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) { + BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table); + } + } catch (Exception e) { + throw new RuntimeException(e); } } }