Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def compute_table_name(row):
import itertools
import json
import logging
import random
import time
import uuid
from builtins import object
Expand All @@ -256,11 +257,13 @@ def compute_table_name(row):
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.iobase import BoundedSource
from apache_beam.io.iobase import RangeTracker
from apache_beam.io.iobase import SourceBundle
from apache_beam.io.textio import _TextSource as TextSource
from apache_beam.metrics import Metrics
from apache_beam.options import value_provider as vp
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
Expand All @@ -275,6 +278,7 @@ def compute_table_name(row):
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX
from apache_beam.transforms.sideinputs import get_sideinput_index
from apache_beam.transforms.util import ReshufflePerKey
from apache_beam.transforms.window import GlobalWindows
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated
Expand Down Expand Up @@ -304,6 +308,8 @@ def compute_table_name(row):
NOTE: This job name template does not have backwards compatibility guarantees.
"""
BQ_JOB_NAME_TEMPLATE = "beam_bq_job_{job_type}_{job_id}_{step_id}{random}"
"""The number of shards per destination when writing via streaming inserts."""
DEFAULT_SHARDS_PER_DESTINATION = 500


@deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
Expand Down Expand Up @@ -1061,11 +1067,17 @@ def __init__(
self._max_batch_size = batch_size or BigQueryWriteFn.DEFAULT_MAX_BATCH_SIZE
self._max_buffered_rows = (
max_buffered_rows or BigQueryWriteFn.DEFAULT_MAX_BUFFERED_ROWS)
self._retry_strategy = (
retry_strategy or bigquery_tools.RetryStrategy.RETRY_ALWAYS)
self._retry_strategy = retry_strategy or RetryStrategy.RETRY_ALWAYS

self.additional_bq_parameters = additional_bq_parameters or {}

self.batch_size_metric = Metrics.distribution(self.__class__, "batch_size")
self.batch_latency_metric = Metrics.distribution(
self.__class__, "batch_latency_ms")
self.failed_rows_metric = Metrics.distribution(
self.__class__, "rows_failed_per_batch")
self.bigquery_wrapper = None

def display_data(self):
return {
'max_batch_size': self._max_batch_size,
Expand Down Expand Up @@ -1103,8 +1115,9 @@ def get_table_schema(schema):
def start_bundle(self):
self._reset_rows_buffer()

self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
client=self.test_client)
if not self.bigquery_wrapper:
self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
client=self.test_client)

self._backoff_calculator = iter(
retry.FuzzedExponentialIntervals(
Expand Down Expand Up @@ -1193,24 +1206,28 @@ def _flush_batch(self, destination):
'Flushing data to %s. Total %s rows.',
destination,
len(rows_and_insert_ids))
self.batch_size_metric.update(len(rows_and_insert_ids))

rows = [r[0] for r in rows_and_insert_ids]
insert_ids = [r[1] for r in rows_and_insert_ids]

while True:
start = time.time()
passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True)
self.batch_latency_metric.update((time.time() - start) * 1000)

failed_rows = [rows[entry.index] for entry in errors]
should_retry = any(
bigquery_tools.RetryStrategy.should_retry(
RetryStrategy.should_retry(
self._retry_strategy, entry.errors[0].reason) for entry in errors)
if not passed:
self.failed_rows_metric.update(len(failed_rows))
message = (
'There were errors inserting to BigQuery. Will{} retry. '
'Errors were {}'.format(("" if should_retry else " not"), errors))
Expand Down Expand Up @@ -1267,13 +1284,17 @@ def __init__(
self.additional_bq_parameters = additional_bq_parameters

class InsertIdPrefixFn(DoFn):
def __init__(self, shards=DEFAULT_SHARDS_PER_DESTINATION):
self.shards = shards

def start_bundle(self):
self.prefix = str(uuid.uuid4())
self._row_count = 0

def process(self, element):
key = element[0]
value = element[1]
key = (key, random.randint(0, self.shards))

insert_id = '%s-%s' % (self.prefix, self._row_count)
self._row_count += 1
Expand All @@ -1290,13 +1311,21 @@ def expand(self, input):
test_client=self.test_client,
additional_bq_parameters=self.additional_bq_parameters)

def drop_shard(elms):
key_and_shard = elms[0]
key = key_and_shard[0]
value = elms[1]
return (key, value)

return (
input
| 'AppendDestination' >> beam.ParDo(
bigquery_tools.AppendDestinationsFn(self.table_reference),
*self.table_side_inputs)
| 'AddInsertIds' >> beam.ParDo(_StreamToBigQuery.InsertIdPrefixFn())
| 'CommitInsertIds' >> beam.Reshuffle()
| 'AddInsertIdsWithRandomKeys' >> beam.ParDo(
_StreamToBigQuery.InsertIdPrefixFn())
| 'CommitInsertIds' >> ReshufflePerKey()
| 'DropShard' >> beam.Map(drop_shard)
| 'StreamInsertRows' >> ParDo(
bigquery_write_fn, *self.schema_side_inputs).with_outputs(
BigQueryWriteFn.FAILED_ROWS, main='main'))
Expand Down Expand Up @@ -1419,7 +1448,18 @@ def __init__(
Default is to retry always. This means that whenever there are rows
that fail to be inserted to BigQuery, they will be retried indefinitely.
Other retry strategy settings will produce a deadletter PCollection
as output.
as output. Appropriate values are:

* `RetryStrategy.RETRY_ALWAYS`: retry all rows if
there are any kind of errors. Note that this will hold your pipeline
back if there are errors until you cancel or update it.
* `RetryStrategy.RETRY_NEVER`: rows with errors
will not be retried. Instead they will be output to a dead letter
queue under the `'FailedRows'` tag.
* `RetryStrategy.RETRY_ON_TRANSIENT_ERROR`: retry
rows with transient errors (e.g. timeouts). Rows with permanent errors
will be output to dead letter queue under `'FailedRows'` tag.

additional_bq_parameters (callable): A function that returns a dictionary
with additional parameters to pass to BQ when creating / loading data
into a table. These can be 'timePartitioning', 'clustering', etc. They
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,8 +1366,12 @@ class AppendDestinationsFn(DoFn):
Experimental; no backwards compatibility guarantees.
"""
def __init__(self, destination):
self._display_destination = destination
self.destination = AppendDestinationsFn._get_table_fn(destination)

def display_data(self):
return {'destination': str(self._display_destination)}

@staticmethod
def _value_provider_or_static_val(elm):
if isinstance(elm, value_provider.ValueProvider):
Expand Down