Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class BatchedStreamingWrite<ErrorT, ElementT>
private final SerializableFunction<ElementT, TableRow> toTableRow;
private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
private final Set<String> allowedMetricUrns;
private @Nullable DatasetService datasetService;

/** Tracks bytes written, exposed as "ByteCount" Counter. */
private Counter byteCounter = SinkMetrics.bytesWritten();
Expand Down Expand Up @@ -222,6 +221,15 @@ private class BatchAndInsertElements extends DoFn<KV<String, TableRowInfo<Elemen
/** The list of unique ids for each BigQuery table row. */
private transient Map<String, List<String>> uniqueIdsForTableRows;

private transient @Nullable DatasetService datasetService;

private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
if (datasetService == null) {
datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
}
return datasetService;
}

/** Prepares a target BigQuery table. */
@StartBundle
public void startBundle() {
Expand Down Expand Up @@ -257,10 +265,10 @@ public void finishBundle(FinishBundleContext context) throws Exception {
tableRows.entrySet()) {
TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
flushRows(
getDatasetService(options),
tableReference,
entry.getValue(),
uniqueIdsForTableRows.get(entry.getKey()),
options,
failedInserts,
successfulInserts);
}
Expand All @@ -272,6 +280,18 @@ public void finishBundle(FinishBundleContext context) throws Exception {
}
reportStreamingApiLogging(options);
}

@Teardown
public void onTeardown() {
try {
if (datasetService != null) {
datasetService.close();
datasetService = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

// The max duration input records are allowed to be buffered in the state, if using ViaStateful.
Expand Down Expand Up @@ -325,13 +345,22 @@ public PCollectionTuple expand(PCollection<KV<String, TableRowInfo<ElementT>>> i
// shuffling.
private class InsertBatchedElements
extends DoFn<KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>, Void> {
private transient @Nullable DatasetService datasetService;

private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
if (datasetService == null) {
datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
}
return datasetService;
}

@ProcessElement
public void processElement(
@Element KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>> input,
BoundedWindow window,
ProcessContext context,
MultiOutputReceiver out)
throws InterruptedException {
throws InterruptedException, IOException {
List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows = new ArrayList<>();
List<String> uniqueIds = new ArrayList<>();
for (TableRowInfo<ElementT> row : input.getValue()) {
Expand All @@ -347,7 +376,13 @@ public void processElement(
TableReference tableReference = BigQueryHelpers.parseTableSpec(input.getKey().getKey());
List<ValueInSingleWindow<ErrorT>> failedInserts = Lists.newArrayList();
List<ValueInSingleWindow<TableRow>> successfulInserts = Lists.newArrayList();
flushRows(tableReference, tableRows, uniqueIds, options, failedInserts, successfulInserts);
flushRows(
getDatasetService(options),
tableReference,
tableRows,
uniqueIds,
failedInserts,
successfulInserts);

for (ValueInSingleWindow<ErrorT> row : failedInserts) {
out.get(failedOutputTag).output(row.getValue());
Expand All @@ -357,58 +392,50 @@ public void processElement(
}
reportStreamingApiLogging(options);
}
}

@Teardown
public void onTeardown() {
try {
if (datasetService != null) {
datasetService.close();
datasetService = null;
@Teardown
public void onTeardown() {
try {
if (datasetService != null) {
datasetService.close();
datasetService = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/** Writes the accumulated rows into BigQuery with streaming API. */
private void flushRows(
DatasetService datasetService,
TableReference tableReference,
List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows,
List<String> uniqueIds,
BigQueryOptions options,
List<ValueInSingleWindow<ErrorT>> failedInserts,
List<ValueInSingleWindow<TableRow>> successfulInserts)
throws InterruptedException {
if (!tableRows.isEmpty()) {
try {
long totalBytes =
getDatasetService(options)
.insertAll(
tableReference,
tableRows,
uniqueIds,
retryPolicy,
failedInserts,
errorContainer,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
successfulInserts);
datasetService.insertAll(
tableReference,
tableRows,
uniqueIds,
retryPolicy,
failedInserts,
errorContainer,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
successfulInserts);
byteCounter.inc(totalBytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
if (datasetService == null) {
datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
}
return datasetService;
}

private void reportStreamingApiLogging(BigQueryOptions options) {
MetricsContainer processWideContainer = MetricsEnvironment.getProcessWideContainer();
if (processWideContainer instanceof MetricsLogger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,49 +965,53 @@ public void validate(PipelineOptions options) {
// earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
// For these cases the withoutValidation method can be used to disable the check.
if (getValidate()) {
if (table != null) {
checkArgument(table.isAccessible(), "Cannot call validate if table is dynamically set.");
}
if (table != null && table.get().getProjectId() != null) {
// Check for source table presence for early failure notification.
DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
BigQueryHelpers.verifyTablePresence(datasetService, table.get());
} else if (getQuery() != null) {
checkArgument(
getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
JobService jobService = getBigQueryServices().getJobService(bqOptions);
try {
jobService.dryRunQuery(
bqOptions.getBigQueryProject() == null
? bqOptions.getProject()
: bqOptions.getBigQueryProject(),
new JobConfigurationQuery()
.setQuery(getQuery().get())
.setFlattenResults(getFlattenResults())
.setUseLegacySql(getUseLegacySql()),
getQueryLocation());
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
try (DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions)) {
if (table != null) {
checkArgument(
table.isAccessible(), "Cannot call validate if table is dynamically set.");
}
if (table != null && table.get().getProjectId() != null) {
// Check for source table presence for early failure notification.
BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
BigQueryHelpers.verifyTablePresence(datasetService, table.get());
} else if (getQuery() != null) {
checkArgument(
getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
JobService jobService = getBigQueryServices().getJobService(bqOptions);
try {
jobService.dryRunQuery(
bqOptions.getBigQueryProject() == null
? bqOptions.getProject()
: bqOptions.getBigQueryProject(),
new JobConfigurationQuery()
.setQuery(getQuery().get())
.setFlattenResults(getFlattenResults())
.setUseLegacySql(getUseLegacySql()),
getQueryLocation());
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
}

DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
// If the user provided a temp dataset, check if the dataset exists before launching the
// query
if (getQueryTempDataset() != null) {
// The temp table is only used for dataset and project id validation, not for table name
// validation
TableReference tempTable =
new TableReference()
.setProjectId(
bqOptions.getBigQueryProject() == null
? bqOptions.getProject()
: bqOptions.getBigQueryProject())
.setDatasetId(getQueryTempDataset())
.setTableId("dummy table");
BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
// If the user provided a temp dataset, check if the dataset exists before launching the
// query
if (getQueryTempDataset() != null) {
// The temp table is only used for dataset and project id validation, not for table
// name
// validation
TableReference tempTable =
new TableReference()
.setProjectId(
bqOptions.getBigQueryProject() == null
? bqOptions.getProject()
: bqOptions.getBigQueryProject())
.setDatasetId(getQueryTempDataset())
.setTableId("dummy table");
BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
}
}
} catch (Exception e) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this here? Can you catch more specific exceptions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() throws Exception, so this is the best I can do.

throw new RuntimeException(e);
}
}
}
Expand Down Expand Up @@ -1401,15 +1405,17 @@ void cleanup(ContextContainer c) throws Exception {
options.getJobName(), jobUuid, JobType.QUERY),
queryTempDataset);

DatasetService datasetService = getBigQueryServices().getDatasetService(options);
LOG.info("Deleting temporary table with query results {}", tempTable);
datasetService.deleteTable(tempTable);
// Delete dataset only if it was created by Beam
boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
if (datasetCreatedByBeam) {
LOG.info(
"Deleting temporary dataset with query results {}", tempTable.getDatasetId());
datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId());
try (DatasetService datasetService =
getBigQueryServices().getDatasetService(options)) {
LOG.info("Deleting temporary table with query results {}", tempTable);
datasetService.deleteTable(tempTable);
// Delete dataset only if it was created by Beam
boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
if (datasetCreatedByBeam) {
LOG.info(
"Deleting temporary dataset with query results {}", tempTable.getDatasetId());
datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId());
}
}
}
};
Expand Down Expand Up @@ -2484,17 +2490,20 @@ public void validate(PipelineOptions pipelineOptions) {
// The user specified a table.
if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) {
TableReference table = getTableWithDefaultProject(options).get();
DatasetService datasetService = getBigQueryServices().getDatasetService(options);
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail when the table or dataset is created by an earlier
// stage of the pipeline. For these cases the #withoutValidation method can be used to
// disable the check.
BigQueryHelpers.verifyDatasetPresence(datasetService, table);
if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
BigQueryHelpers.verifyTablePresence(datasetService, table);
}
if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
try (DatasetService datasetService = getBigQueryServices().getDatasetService(options)) {
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail when the table or dataset is created by an earlier
// stage of the pipeline. For these cases the #withoutValidation method can be used to
// disable the check.
BigQueryHelpers.verifyDatasetPresence(datasetService, table);
if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
BigQueryHelpers.verifyTablePresence(datasetService, table);
}
if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
}
} catch (Exception e) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed?

throw new RuntimeException(e);
}
}
}
Expand Down