Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Teardown;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -81,6 +85,7 @@ class BatchedStreamingWrite<ErrorT, ElementT>
private final SerializableFunction<ElementT, TableRow> toTableRow;
private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
private final Set<String> allowedMetricUrns;
private @Nullable DatasetService datasetService;

/** Tracks bytes written, exposed as "ByteCount" Counter. */
private Counter byteCounter = SinkMetrics.bytesWritten();
Expand Down Expand Up @@ -344,6 +349,18 @@ public void processElement(
}
}

@Teardown
public void onTeardown() {
try {
if (datasetService != null) {
datasetService.close();
datasetService = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/** Writes the accumulated rows into BigQuery with streaming API. */
private void flushRows(
TableReference tableReference,
Expand All @@ -355,8 +372,7 @@ private void flushRows(
if (!tableRows.isEmpty()) {
try {
long totalBytes =
bqServices
.getDatasetService(options)
getDatasetService(options)
.insertAll(
tableReference,
tableRows,
Expand All @@ -374,6 +390,13 @@ private void flushRows(
}
}

private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
if (datasetService == null) {
datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
}
return datasetService;
}

private void reportStreamingApiLogging(BigQueryOptions options) {
MetricsContainer processWideContainer = MetricsEnvironment.getProcessWideContainer();
if (processWideContainer instanceof MetricsLogger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,17 @@ static void verifyDatasetPresence(DatasetService datasetService, TableReference
public static @Nullable BigInteger getNumRows(BigQueryOptions options, TableReference tableRef)
throws InterruptedException, IOException {

DatasetService datasetService = new BigQueryServicesImpl().getDatasetService(options);
Table table = datasetService.getTable(tableRef);
if (table == null) {
return null;
try (DatasetService datasetService = new BigQueryServicesImpl().getDatasetService(options)) {
Table table = datasetService.getTable(tableRef);
if (table == null) {
return null;
}
return table.getNumRows();
} catch (IOException | InterruptedException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
return table.getNumRows();
}

static String getDatasetLocation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public interface BigQueryServices extends Serializable {
StorageClient getStorageClient(BigQueryOptions bqOptions) throws IOException;

/** An interface for the Cloud BigQuery load service. */
public interface JobService {
public interface JobService extends AutoCloseable {
/** Start a BigQuery load job. */
void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException;
Expand Down Expand Up @@ -98,7 +98,7 @@ JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig, S
}

/** An interface to get, create and delete Cloud BigQuery datasets and tables. */
public interface DatasetService {
public interface DatasetService extends AutoCloseable {
/**
* Gets the specified {@link Table} resource by table ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ public Job getJob(JobReference jobRef, Sleeper sleeper, BackOff backoff)
jobRef, MAX_RPC_RETRIES),
lastException);
}

@Override
public void close() throws Exception {}
}

@VisibleForTesting
Expand Down Expand Up @@ -1173,6 +1176,13 @@ public ApiFuture<BatchCommitWriteStreamsResponse> commitWriteStreams(
.addAllWriteStreams(writeStreamNames)
.build());
}

@Override
public void close() throws Exception {
this.newWriteClient.shutdownNow();
this.newWriteClient.awaitTermination(60, TimeUnit.SECONDS);
this.newWriteClient.close();
}
}

static final SerializableFunction<IOException, Boolean> DONT_RETRY_NOT_FOUND =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,10 @@ private static void tryCreateTable(
String tableSpec,
String kmsKey,
BigQueryServices bqServices) {
DatasetService datasetService =
bqServices.getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class));
TableReference tableReference = tableDestination.getTableReference().clone();
tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId()));
try {
try (DatasetService datasetService =
bqServices.getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class))) {
if (datasetService.getTable(tableReference) == null) {
TableSchema tableSchema = schemaSupplier.get();
checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableSpec;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -358,9 +359,13 @@ private Table getBigQueryTable(TableReference tableReference) {
if (tableReference.getProjectId() == null) {
tableReference.setProjectId(bqOptions.getProject());
}
return bqServices.getDatasetService(bqOptions).getTable(tableReference);
} catch (InterruptedException | IOException e) {
LOG.info("Failed to get BigQuery table " + tableReference);
try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) {
return datasetService.getTable(tableReference);
} catch (InterruptedException | IOException e) {
LOG.info("Failed to get BigQuery table " + tableReference);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} while (nextBackOff(Sleeper.DEFAULT, backoff));
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws
return datasetService;
}

@Teardown
public void onTeardown() {
try {
if (datasetService != null) {
datasetService.close();
datasetService = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@StartBundle
public void startBundle() throws IOException {
commitStreams = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws
return datasetService;
}

@Teardown
public void onTeardown() {
try {
if (datasetService != null) {
datasetService.close();
datasetService = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@SuppressWarnings({"nullness"})
@ProcessElement
public void process(PipelineOptions pipelineOptions, @Element KV<String, Operation> element)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
private final String kmsKey;
private final BigQueryServices bqServices;
private final Coder<DestinationT> destinationCoder;
@Nullable private DatasetService datasetService = null;

public StorageApiWriteUnshardedRecords(
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
Expand All @@ -88,12 +87,6 @@ public StorageApiWriteUnshardedRecords(
this.destinationCoder = destinationCoder;
}

private void initializeDatasetService(PipelineOptions pipelineOptions) {
if (datasetService == null) {
datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
}
}

@Override
public PCollection<Void> expand(PCollection<KV<DestinationT, ElementT>> input) {
String operationName = input.getName() + "/" + getName();
Expand Down Expand Up @@ -244,11 +237,18 @@ void flush() throws Exception {

private Map<DestinationT, DestinationState> destinations = Maps.newHashMap();
private final TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
private @Nullable DatasetService datasetService;

WriteRecordsDoFn(String operationName) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
}

private void initializeDatasetService(PipelineOptions pipelineOptions) {
if (datasetService == null) {
datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
}
}

@StartBundle
public void startBundle() throws IOException {
destinations = Maps.newHashMap();
Expand Down Expand Up @@ -318,6 +318,14 @@ public void teardown() {
for (DestinationState state : destinations.values()) {
state.close();
}
try {
if (datasetService != null) {
datasetService.close();
datasetService = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT>
private final String kmsKey;
private final BigQueryServices bqServices;
private final Coder<DestinationT> destinationCoder;
@Nullable private DatasetService datasetServiceInternal = null;
private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();

private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
Expand All @@ -112,14 +111,6 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT>
})
.build();

private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
if (datasetServiceInternal == null) {
datasetServiceInternal =
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
}
return datasetServiceInternal;
}

// Run a closure asynchronously, ignoring failures.
private interface ThrowingRunnable {
void run() throws Exception;
Expand Down Expand Up @@ -257,6 +248,8 @@ class WriteRecordsDoFn

private Map<DestinationT, TableDestination> destinations = Maps.newHashMap();

private @Nullable DatasetService datasetServiceInternal = null;

// Stores the current stream for this key.
@StateId("streamName")
private final StateSpec<ValueState<String>> streamNameSpec = StateSpecs.value();
Expand Down Expand Up @@ -295,6 +288,26 @@ String getOrCreateStream(
return stream;
}

private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
if (datasetServiceInternal == null) {
datasetServiceInternal =
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
}
return datasetServiceInternal;
}

@Teardown
public void onTeardown() {
try {
if (datasetServiceInternal != null) {
datasetServiceInternal.close();
datasetServiceInternal = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@SuppressWarnings({"nullness"})
@ProcessElement
public void process(
Expand Down
Loading