From 1eb821d4b33e7a9e08e332c94625c82fa5b6fe2c Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 19 Oct 2020 16:43:12 -0500 Subject: [PATCH 1/5] fix: don't fail with 429 when downloading wide tables --- .../big_query_read/transports/grpc.py | 16 ++++++++--- synth.py | 25 +++++++++++++++++ tests/system/v1/test_reader_v1.py | 28 +++++++++++++++++++ 3 files changed, 65 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py index 7777e68c..88026390 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py @@ -91,10 +91,10 @@ def __init__( for grpc channel. It is ignored if ``channel`` is provided. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. Raises: @@ -145,6 +145,10 @@ def __init__( ssl_credentials=ssl_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, + options={ + 'grpc.max_send_message_length': -1, + 'grpc.max_receive_message_length': -1, + }.items(), ) else: host = host if ":" in host else host + ":443" @@ -162,6 +166,10 @@ def __init__( ssl_credentials=ssl_channel_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, + options={ + 'grpc.max_send_message_length': -1, + 'grpc.max_receive_message_length': -1, + }.items(), ) self._stubs = {} # type: Dict[str, Callable] diff --git a/synth.py b/synth.py index e3fa7eea..f80c4222 100644 --- a/synth.py +++ b/synth.py @@ -94,6 +94,31 @@ '\g<0>\n\n session.install("google-cloud-bigquery")', ) +# Remove client-side validation of message length. +# https://github.com/googleapis/python-bigquery-storage/issues/78 +s.replace( + "google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py", + ( + r"host,\s*" + r"credentials=credentials,\s*" + r"credentials_file=credentials_file,\s*" + r"ssl_credentials=ssl_credentials,\s*" + r"scopes=scopes or self.AUTH_SCOPES,\s*" + r"quota_project_id=quota_project_id" + ), + """host, + credentials=credentials, + credentials_file=credentials_file, + ssl_credentials=ssl_credentials, + scopes=scopes or self.AUTH_SCOPES, + quota_project_id=quota_project_id, + options={ + 'grpc.max_send_message_length': -1, + 'grpc.max_receive_message_length': -1, + }.items()""" +) + + # We don't want the generated client to be accessible through # "google.cloud.bigquery_storage", replace it with the hand written client that # wraps it. diff --git a/tests/system/v1/test_reader_v1.py b/tests/system/v1/test_reader_v1.py index ff0e5b9f..a77b65f7 100644 --- a/tests/system/v1/test_reader_v1.py +++ b/tests/system/v1/test_reader_v1.py @@ -461,3 +461,31 @@ def test_resuming_read_from_offset( expected_len = 164656 # total rows in shakespeare table actual_len = remaining_rows_count + some_rows.row_count + more_rows.row_count assert actual_len == expected_len + + +def test_read_rows_to_dataframe_with_wide_table(client, project_id): + # Use a wide table to boost the chance of getting a large message size. + # https://github.com/googleapis/python-bigquery-storage/issues/78 + read_session = types.ReadSession() + read_session.table = "projects/{}/datasets/{}/tables/{}".format( + "bigquery-public-data", "geo_census_tracts", "us_census_tracts_national" + ) + read_session.data_format = types.DataFormat.ARROW + + session = client.create_read_session( + request={ + "parent": "projects/{}".format(project_id), + "read_session": read_session, + "max_stream_count": 1, + } + ) + + stream = session.streams[0].name + + read_rows_stream = client.read_rows(stream) + + # fetch the first two batches of rows + pages_iter = iter(read_rows_stream.rows(session).pages) + some_rows = next(pages_iter) + + assert all(len(row["tract_geom"].as_py()) > 0 for row in some_rows) From c05c4a8fe8d6f3dd4aaefad4e8d346f67a16b366 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 19 Oct 2020 16:48:21 -0500 Subject: [PATCH 2/5] make ssl_credentials match more generic --- synth.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synth.py b/synth.py index f80c4222..9806f783 100644 --- a/synth.py +++ b/synth.py @@ -102,7 +102,7 @@ r"host,\s*" r"credentials=credentials,\s*" r"credentials_file=credentials_file,\s*" - r"ssl_credentials=ssl_credentials,\s*" + r"ssl_credentials=[a-z_]+,\s*" r"scopes=scopes or self.AUTH_SCOPES,\s*" r"quota_project_id=quota_project_id" ), @@ -113,8 +113,8 @@ scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, options={ - 'grpc.max_send_message_length': -1, - 'grpc.max_receive_message_length': -1, + "grpc.max_send_message_length": -1, + "grpc.max_receive_message_length": -1, }.items()""" ) From 9f104b67b906f0521de70a916c48f3a51f111a9a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 19 Oct 2020 22:08:15 +0000 Subject: [PATCH 3/5] make synth.py more robust --- .../services/big_query_read/transports/grpc.py | 16 ++++++++-------- synth.py | 12 ++++-------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py index 88026390..506f2a3b 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py @@ -91,10 +91,10 @@ def __init__( for grpc channel. It is ignored if ``channel`` is provided. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. Raises: @@ -146,8 +146,8 @@ def __init__( scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, options={ - 'grpc.max_send_message_length': -1, - 'grpc.max_receive_message_length': -1, + "grpc.max_send_message_length": -1, + "grpc.max_receive_message_length": -1, }.items(), ) else: @@ -167,8 +167,8 @@ def __init__( scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, options={ - 'grpc.max_send_message_length': -1, - 'grpc.max_receive_message_length': -1, + "grpc.max_send_message_length": -1, + "grpc.max_receive_message_length": -1, }.items(), ) diff --git a/synth.py b/synth.py index 9806f783..d62b974c 100644 --- a/synth.py +++ b/synth.py @@ -99,23 +99,19 @@ s.replace( "google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py", ( + r"type\(self\).create_channel\(\s*" r"host,\s*" r"credentials=credentials,\s*" r"credentials_file=credentials_file,\s*" - r"ssl_credentials=[a-z_]+,\s*" + r"ssl_credentials=ssl_[a-z_]*credentials,\s*" r"scopes=scopes or self.AUTH_SCOPES,\s*" r"quota_project_id=quota_project_id" ), - """host, - credentials=credentials, - credentials_file=credentials_file, - ssl_credentials=ssl_credentials, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, + """\g<0>, options={ "grpc.max_send_message_length": -1, "grpc.max_receive_message_length": -1, - }.items()""" + }.items()""", ) From 9a0b3a1734edcb3b859b32fcb6e5f9f0bafb671a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 20 Oct 2020 09:45:32 -0500 Subject: [PATCH 4/5] update synth to update tests --- .../big_query_read/transports/grpc.py | 18 +++----- .../big_query_read/transports/grpc_asyncio.py | 10 +++-- synth.py | 44 ++++++++++++------- .../test_big_query_read.py | 2 + 4 files changed, 42 insertions(+), 32 deletions(-) diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py index 506f2a3b..8b7c53ff 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py @@ -91,10 +91,10 @@ def __init__( for grpc channel. It is ignored if ``channel`` is provided. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. Raises: @@ -145,10 +145,7 @@ def __init__( ssl_credentials=ssl_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, - options={ - "grpc.max_send_message_length": -1, - "grpc.max_receive_message_length": -1, - }.items(), + options=(('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)), ) else: host = host if ":" in host else host + ":443" @@ -166,10 +163,7 @@ def __init__( ssl_credentials=ssl_channel_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, - options={ - "grpc.max_send_message_length": -1, - "grpc.max_receive_message_length": -1, - }.items(), + options=(('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)), ) self._stubs = {} # type: Dict[str, Callable] diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py index c1715a24..ac1b462b 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py @@ -136,10 +136,10 @@ def __init__( for grpc channel. It is ignored if ``channel`` is provided. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. Raises: @@ -190,6 +190,7 @@ def __init__( ssl_credentials=ssl_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, + options=(('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)), ) else: host = host if ":" in host else host + ":443" @@ -207,6 +208,7 @@ def __init__( ssl_credentials=ssl_channel_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, + options=(('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)), ) # Run the base constructor. diff --git a/synth.py b/synth.py index d62b974c..9cb02d21 100644 --- a/synth.py +++ b/synth.py @@ -14,6 +14,7 @@ """This script is used to synthesize generated parts of this library.""" +import pathlib import re import synthtool as s @@ -77,7 +78,9 @@ unit_test_dependencies=optional_deps, cov_level=95, ) -s.move(templated_files, excludes=[".coveragerc"]) # microgenerator has a good .coveragerc file +s.move( + templated_files, excludes=[".coveragerc"] +) # microgenerator has a good .coveragerc file # ---------------------------------------------------------------------------- @@ -97,7 +100,9 @@ # Remove client-side validation of message length. # https://github.com/googleapis/python-bigquery-storage/issues/78 s.replace( - "google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py", + pathlib.Path( + "google/cloud/bigquery_storage_v1/services/big_query_read/transports/" + ).glob("grpc*.py"), ( r"type\(self\).create_channel\(\s*" r"host,\s*" @@ -108,10 +113,23 @@ r"quota_project_id=quota_project_id" ), """\g<0>, - options={ - "grpc.max_send_message_length": -1, - "grpc.max_receive_message_length": -1, - }.items()""", + options=( + ('grpc.max_send_message_length', -1), + ('grpc.max_receive_message_length', -1) + )""", +) +s.replace( + "tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py", + ( + r"grpc_create_channel\.assert_called_once_with\(" r"[^()]+", + r"scopes=\(" r"[^()]+", + r"\),\s*" r"ssl_credentials=[a-z_]+,\s*" r"quota_project_id=None", + ), + """\g<0>, + options=( + ('grpc.max_send_message_length', -1), + ('grpc.max_receive_message_length', -1) + )""", ) @@ -121,7 +139,7 @@ s.replace( "google/cloud/bigquery_storage/__init__.py", r"from google\.cloud\.bigquery_storage_v1\.services.big_query_read.client import", - "from google.cloud.bigquery_storage_v1 import" + "from google.cloud.bigquery_storage_v1 import", ) # We also don't want to expose the async client just yet, at least not until @@ -136,7 +154,7 @@ ) s.replace( "google/cloud/bigquery_storage/__init__.py", - r"""["']BigQueryReadAsyncClient["'],\n""", + r"""["']BigQueryReadAsyncClient["'],\n""", "", ) @@ -154,11 +172,7 @@ s.replace( "google/cloud/bigquery_storage/__init__.py", r"""["']ArrowRecordBatch["']""", - ( - '"__version__",\n' - ' "types",\n' - " \g<0>" - ), + ('"__version__",\n' ' "types",\n' " \g<0>"), ) # We want to expose all types through "google.cloud.bigquery_storage.types", @@ -211,9 +225,7 @@ ), ) s.replace( - "noxfile.py", - r'--cov=tests\.unit', - '--cov=tests/unit', + "noxfile.py", r"--cov=tests\.unit", "--cov=tests/unit", ) # TODO(busunkim): Use latest sphinx after microgenerator transition diff --git a/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py b/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py index 5ce4b3b2..1df0bf94 100644 --- a/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py +++ b/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py @@ -1232,6 +1232,7 @@ def test_big_query_read_transport_channel_mtls_with_client_cert_source(transport ), ssl_credentials=mock_ssl_cred, quota_project_id=None, + options=(("grpc.max_send_message_length", -1), ("grpc.max_receive_message_length", -1)), ) assert transport.grpc_channel == mock_grpc_channel @@ -1273,6 +1274,7 @@ def test_big_query_read_transport_channel_mtls_with_adc(transport_class): ), ssl_credentials=mock_ssl_cred, quota_project_id=None, + options=(('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)), ) assert transport.grpc_channel == mock_grpc_channel From 6418b3d7cb85bd63c54c132da1a590412a11b077 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 20 Oct 2020 15:03:41 +0000 Subject: [PATCH 5/5] fix updates to synth.py --- .../services/big_query_read/transports/grpc.py | 18 ++++++++++++------ .../big_query_read/transports/grpc_asyncio.py | 18 ++++++++++++------ synth.py | 15 ++++++++------- .../bigquery_storage_v1/test_big_query_read.py | 10 ++++++++-- 4 files changed, 40 insertions(+), 21 deletions(-) diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py index 8b7c53ff..36377f1d 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py @@ -91,10 +91,10 @@ def __init__( for grpc channel. It is ignored if ``channel`` is provided. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. Raises: @@ -145,7 +145,10 @@ def __init__( ssl_credentials=ssl_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, - options=(('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)), + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) else: host = host if ":" in host else host + ":443" @@ -163,7 +166,10 @@ def __init__( ssl_credentials=ssl_channel_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, - options=(('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)), + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) self._stubs = {} # type: Dict[str, Callable] diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py index ac1b462b..b383f36d 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py @@ -136,10 +136,10 @@ def __init__( for grpc channel. It is ignored if ``channel`` is provided. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. Raises: @@ -190,7 +190,10 @@ def __init__( ssl_credentials=ssl_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, - options=(('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)), + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) else: host = host if ":" in host else host + ":443" @@ -208,7 +211,10 @@ def __init__( ssl_credentials=ssl_channel_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, - options=(('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)), + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) # Run the base constructor. diff --git a/synth.py b/synth.py index 9cb02d21..863ba860 100644 --- a/synth.py +++ b/synth.py @@ -14,7 +14,6 @@ """This script is used to synthesize generated parts of this library.""" -import pathlib import re import synthtool as s @@ -100,9 +99,10 @@ # Remove client-side validation of message length. # https://github.com/googleapis/python-bigquery-storage/issues/78 s.replace( - pathlib.Path( - "google/cloud/bigquery_storage_v1/services/big_query_read/transports/" - ).glob("grpc*.py"), + [ + "google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py", + "google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py", + ], ( r"type\(self\).create_channel\(\s*" r"host,\s*" @@ -121,9 +121,10 @@ s.replace( "tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py", ( - r"grpc_create_channel\.assert_called_once_with\(" r"[^()]+", - r"scopes=\(" r"[^()]+", - r"\),\s*" r"ssl_credentials=[a-z_]+,\s*" r"quota_project_id=None", + r"grpc_create_channel\.assert_called_once_with\([^()]+" + r"scopes=\([^()]+\),\s*" + r"ssl_credentials=[a-z_]+,\s*" + r"quota_project_id=None" ), """\g<0>, options=( diff --git a/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py b/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py index 1df0bf94..a86692f3 100644 --- a/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py +++ b/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py @@ -1232,7 +1232,10 @@ def test_big_query_read_transport_channel_mtls_with_client_cert_source(transport ), ssl_credentials=mock_ssl_cred, quota_project_id=None, - options=(("grpc.max_send_message_length", -1), ("grpc.max_receive_message_length", -1)), + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) assert transport.grpc_channel == mock_grpc_channel @@ -1274,7 +1277,10 @@ def test_big_query_read_transport_channel_mtls_with_adc(transport_class): ), ssl_credentials=mock_ssl_cred, quota_project_id=None, - options=(('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)), + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) assert transport.grpc_channel == mock_grpc_channel