From 6ac5356c611b029d1a4e0645a7ead46b1785d543 Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Thu, 6 Aug 2020 13:56:47 -0700 Subject: [PATCH 1/7] [BEAM-6064] Improvements to BQ streaming insert performance --- sdks/python/apache_beam/io/gcp/bigquery.py | 46 +++++++++++++++++-- .../apache_beam/io/gcp/bigquery_tools.py | 4 ++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 2656a84422d4..aa4e9342aaf1 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -261,6 +261,7 @@ def compute_table_name(row): from apache_beam.io.iobase import RangeTracker from apache_beam.io.iobase import SourceBundle from apache_beam.io.textio import _TextSource as TextSource +from apache_beam.metrics import Metrics from apache_beam.options import value_provider as vp from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions @@ -272,6 +273,7 @@ def compute_table_name(row): from apache_beam.transforms import DoFn from apache_beam.transforms import ParDo from apache_beam.transforms import PTransform +from apache_beam.transforms.util import ReshufflePerKey from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX from apache_beam.transforms.sideinputs import get_sideinput_index @@ -1066,6 +1068,13 @@ def __init__( self.additional_bq_parameters = additional_bq_parameters or {} + self.batch_size_metric = Metrics.distribution(self.__class__, + "batch_size") + self.batch_latency_metric = Metrics.distribution(self.__class__, + "batch_latency_ms") + self.failed_rows_metric = Metrics.distribution(self.__class__, + "rows_failed_per_batch") + def display_data(self): return { 'max_batch_size': self._max_batch_size, @@ -1101,6 +1110,7 @@ def get_table_schema(schema): raise TypeError('Unexpected schema argument: %s.' % schema) def start_bundle(self): + logging.info('Starting a bundle') self._reset_rows_buffer() self.bigquery_wrapper = bigquery_tools.BigQueryWrapper( @@ -1193,11 +1203,13 @@ def _flush_batch(self, destination): 'Flushing data to %s. Total %s rows.', destination, len(rows_and_insert_ids)) + self.batch_size_metric.update(len(rows_and_insert_ids)) rows = [r[0] for r in rows_and_insert_ids] insert_ids = [r[1] for r in rows_and_insert_ids] while True: + start = time.time() passed, errors = self.bigquery_wrapper.insert_rows( project_id=table_reference.projectId, dataset_id=table_reference.datasetId, @@ -1205,12 +1217,14 @@ def _flush_batch(self, destination): rows=rows, insert_ids=insert_ids, skip_invalid_rows=True) + self.batch_latency_metric.update((time.time() - start) * 1000) failed_rows = [rows[entry.index] for entry in errors] should_retry = any( bigquery_tools.RetryStrategy.should_retry( self._retry_strategy, entry.errors[0].reason) for entry in errors) if not passed: + self.failed_rows_metric.update(len(failed_rows)) message = ( 'There were errors inserting to BigQuery. Will{} retry. ' 'Errors were {}'.format(("" if should_retry else " not"), errors)) @@ -1267,13 +1281,18 @@ def __init__( self.additional_bq_parameters = additional_bq_parameters class InsertIdPrefixFn(DoFn): + def __init__(self, shards=500): + self.shards = shards + def start_bundle(self): self.prefix = str(uuid.uuid4()) self._row_count = 0 def process(self, element): + import random key = element[0] value = element[1] + key = (key, random.randint(0, self.shards)) insert_id = '%s-%s' % (self.prefix, self._row_count) self._row_count += 1 @@ -1290,13 +1309,23 @@ def expand(self, input): test_client=self.test_client, additional_bq_parameters=self.additional_bq_parameters) + KEYS_PER_DEST = 500 + + def drop_shard(elms): + key_and_shard = elms[0] + key = key_and_shard[0] + value = elms[1] + return (key, value) + return ( input | 'AppendDestination' >> beam.ParDo( bigquery_tools.AppendDestinationsFn(self.table_reference), *self.table_side_inputs) - | 'AddInsertIds' >> beam.ParDo(_StreamToBigQuery.InsertIdPrefixFn()) - | 'CommitInsertIds' >> beam.Reshuffle() + | 'AddInsertIdsWithRandomKeys' >> beam.ParDo( + _StreamToBigQuery.InsertIdPrefixFn()) + | 'CommitInsertIds' >> ReshufflePerKey() + | 'DropShard' >> beam.Map(drop_shard) | 'StreamInsertRows' >> ParDo( bigquery_write_fn, *self.schema_side_inputs).with_outputs( BigQueryWriteFn.FAILED_ROWS, main='main')) @@ -1419,7 +1448,18 @@ def __init__( Default is to retry always. This means that whenever there are rows that fail to be inserted to BigQuery, they will be retried indefinitely. Other retry strategy settings will produce a deadletter PCollection - as output. + as output. Appropriate values are: + + * :attr:`bigquery_tools.RetryStrategy.RETRY_ALWAYS`: retry all rows if + there are any kind of errors. Note that this will hold your pipeline + back if there are errors until you cancel or update it. + * :attr:`bigquery_tools.RetryStrategy.RETRY_NEVER`: rows with errors + will not be retried. Instead they will be output to a dead letter + queue under the `'FailedRows'` tag. + * :attr:`bigquery_tools.RetryStrategy.RETRY_ON_TRANSIENT_ERROR`: retry + rows with transient errors (e.g. timeouts). Rows with permanent errors + will be output to dead letter queue under `'FailedRows'` tag. + additional_bq_parameters (callable): A function that returns a dictionary with additional parameters to pass to BQ when creating / loading data into a table. These can be 'timePartitioning', 'clustering', etc. They diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 1230d1dac7cd..67ef1b5f4dce 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -1366,8 +1366,12 @@ class AppendDestinationsFn(DoFn): Experimental; no backwards compatibility guarantees. """ def __init__(self, destination): + self._display_destination = destination self.destination = AppendDestinationsFn._get_table_fn(destination) + def display_data(self): + return {'destination': str(self._display_destination)} + @staticmethod def _value_provider_or_static_val(elm): if isinstance(elm, value_provider.ValueProvider): From 52b08843adaa753efaa76d49e01018ecc272ef85 Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Thu, 6 Aug 2020 13:58:32 -0700 Subject: [PATCH 2/7] Fixup --- sdks/python/apache_beam/io/gcp/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index aa4e9342aaf1..741b787f6a81 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -238,6 +238,7 @@ def compute_table_name(row): import itertools import json import logging +import random import time import uuid from builtins import object @@ -1289,7 +1290,6 @@ def start_bundle(self): self._row_count = 0 def process(self, element): - import random key = element[0] value = element[1] key = (key, random.randint(0, self.shards)) From ca0857b6c38abc71b557c27a359b1a4711222835 Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Thu, 6 Aug 2020 13:59:51 -0700 Subject: [PATCH 3/7] Fixup --- sdks/python/apache_beam/io/gcp/bigquery.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 741b787f6a81..612eadb353c6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1075,6 +1075,7 @@ def __init__( "batch_latency_ms") self.failed_rows_metric = Metrics.distribution(self.__class__, "rows_failed_per_batch") + self.bigquery_wrapper = None def display_data(self): return { @@ -1111,11 +1112,11 @@ def get_table_schema(schema): raise TypeError('Unexpected schema argument: %s.' % schema) def start_bundle(self): - logging.info('Starting a bundle') self._reset_rows_buffer() - self.bigquery_wrapper = bigquery_tools.BigQueryWrapper( - client=self.test_client) + if not self.bigquery_wrapper: + self.bigquery_wrapper = bigquery_tools.BigQueryWrapper( + client=self.test_client) self._backoff_calculator = iter( retry.FuzzedExponentialIntervals( From 2b6c0bf4488a2cfdc42c77c5ad231790ede40506 Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Fri, 7 Aug 2020 12:10:08 -0700 Subject: [PATCH 4/7] Fixup --- sdks/python/apache_beam/io/gcp/bigquery.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 612eadb353c6..469f7b4c8dff 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -309,6 +309,10 @@ def compute_table_name(row): BQ_JOB_NAME_TEMPLATE = "beam_bq_job_{job_type}_{job_id}_{step_id}{random}" +"""The number of shards per destination when writing via streaming inserts.""" +DEFAULT_SHARDS_PER_DESTINATION = 500 + + @deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference") def _parse_table_reference(table, dataset=None, project=None): return bigquery_tools.parse_table_reference(table, dataset, project) @@ -1069,12 +1073,11 @@ def __init__( self.additional_bq_parameters = additional_bq_parameters or {} - self.batch_size_metric = Metrics.distribution(self.__class__, - "batch_size") - self.batch_latency_metric = Metrics.distribution(self.__class__, - "batch_latency_ms") - self.failed_rows_metric = Metrics.distribution(self.__class__, - "rows_failed_per_batch") + self.batch_size_metric = Metrics.distribution(self.__class__, "batch_size") + self.batch_latency_metric = Metrics.distribution( + self.__class__, "batch_latency_ms") + self.failed_rows_metric = Metrics.distribution( + self.__class__, "rows_failed_per_batch") self.bigquery_wrapper = None def display_data(self): @@ -1283,7 +1286,7 @@ def __init__( self.additional_bq_parameters = additional_bq_parameters class InsertIdPrefixFn(DoFn): - def __init__(self, shards=500): + def __init__(self, shards=DEFAULT_SHARDS_PER_DESTINATION): self.shards = shards def start_bundle(self): @@ -1310,8 +1313,6 @@ def expand(self, input): test_client=self.test_client, additional_bq_parameters=self.additional_bq_parameters) - KEYS_PER_DEST = 500 - def drop_shard(elms): key_and_shard = elms[0] key = key_and_shard[0] From b2b548f5c64ee709b56b65fc4d752fbf43e74c4b Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Fri, 7 Aug 2020 15:29:04 -0700 Subject: [PATCH 5/7] Fixup --- sdks/python/apache_beam/io/gcp/bigquery.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 469f7b4c8dff..41ce0c43ede4 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -257,6 +257,7 @@ def compute_table_name(row): from apache_beam.io.filesystems import FileSystems from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata +from apache_beam.io.gcp.bigquery_tools import RetryStrategy from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.io.iobase import BoundedSource from apache_beam.io.iobase import RangeTracker @@ -274,10 +275,10 @@ def compute_table_name(row): from apache_beam.transforms import DoFn from apache_beam.transforms import ParDo from apache_beam.transforms import PTransform -from apache_beam.transforms.util import ReshufflePerKey from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX from apache_beam.transforms.sideinputs import get_sideinput_index +from apache_beam.transforms.util import ReshufflePerKey from apache_beam.transforms.window import GlobalWindows from apache_beam.utils import retry from apache_beam.utils.annotations import deprecated @@ -307,8 +308,6 @@ def compute_table_name(row): NOTE: This job name template does not have backwards compatibility guarantees. """ BQ_JOB_NAME_TEMPLATE = "beam_bq_job_{job_type}_{job_id}_{step_id}{random}" - - """The number of shards per destination when writing via streaming inserts.""" DEFAULT_SHARDS_PER_DESTINATION = 500 @@ -1069,7 +1068,7 @@ def __init__( self._max_buffered_rows = ( max_buffered_rows or BigQueryWriteFn.DEFAULT_MAX_BUFFERED_ROWS) self._retry_strategy = ( - retry_strategy or bigquery_tools.RetryStrategy.RETRY_ALWAYS) + retry_strategy or RetryStrategy.RETRY_ALWAYS) self.additional_bq_parameters = additional_bq_parameters or {} @@ -1452,13 +1451,13 @@ def __init__( Other retry strategy settings will produce a deadletter PCollection as output. Appropriate values are: - * :attr:`bigquery_tools.RetryStrategy.RETRY_ALWAYS`: retry all rows if + * `RetryStrategy.RETRY_ALWAYS`: retry all rows if there are any kind of errors. Note that this will hold your pipeline back if there are errors until you cancel or update it. - * :attr:`bigquery_tools.RetryStrategy.RETRY_NEVER`: rows with errors + * `RetryStrategy.RETRY_NEVER`: rows with errors will not be retried. Instead they will be output to a dead letter queue under the `'FailedRows'` tag. - * :attr:`bigquery_tools.RetryStrategy.RETRY_ON_TRANSIENT_ERROR`: retry + * `RetryStrategy.RETRY_ON_TRANSIENT_ERROR`: retry rows with transient errors (e.g. timeouts). Rows with permanent errors will be output to dead letter queue under `'FailedRows'` tag. From ad922386540f1ea3c6c411227fc9e2f73eb545ba Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Fri, 7 Aug 2020 15:43:01 -0700 Subject: [PATCH 6/7] fixup --- sdks/python/apache_beam/io/gcp/bigquery.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 41ce0c43ede4..5ccd9f9b5b96 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1067,8 +1067,7 @@ def __init__( self._max_batch_size = batch_size or BigQueryWriteFn.DEFAULT_MAX_BATCH_SIZE self._max_buffered_rows = ( max_buffered_rows or BigQueryWriteFn.DEFAULT_MAX_BUFFERED_ROWS) - self._retry_strategy = ( - retry_strategy or RetryStrategy.RETRY_ALWAYS) + self._retry_strategy =retry_strategy or RetryStrategy.RETRY_ALWAYS self.additional_bq_parameters = additional_bq_parameters or {} @@ -1225,7 +1224,7 @@ def _flush_batch(self, destination): failed_rows = [rows[entry.index] for entry in errors] should_retry = any( - bigquery_tools.RetryStrategy.should_retry( + RetryStrategy.should_retry( self._retry_strategy, entry.errors[0].reason) for entry in errors) if not passed: self.failed_rows_metric.update(len(failed_rows)) From 1e075755b871ee99454580fc17d0ff9b89c42dc9 Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Fri, 7 Aug 2020 15:48:45 -0700 Subject: [PATCH 7/7] fixup --- sdks/python/apache_beam/io/gcp/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 5ccd9f9b5b96..c17fb244813f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1067,7 +1067,7 @@ def __init__( self._max_batch_size = batch_size or BigQueryWriteFn.DEFAULT_MAX_BATCH_SIZE self._max_buffered_rows = ( max_buffered_rows or BigQueryWriteFn.DEFAULT_MAX_BUFFERED_ROWS) - self._retry_strategy =retry_strategy or RetryStrategy.RETRY_ALWAYS + self._retry_strategy = retry_strategy or RetryStrategy.RETRY_ALWAYS self.additional_bq_parameters = additional_bq_parameters or {}