From 6593ee94bf4c741a2ee4e37f15db09b8eebb68fb Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Thu, 3 Oct 2024 18:55:19 +0000 Subject: [PATCH 01/20] adds a new requested_stream function and docstring --- google/cloud/bigquery/_pandas_helpers.py | 103 ++++++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 210ab4875..d63d7f747 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -823,7 +823,71 @@ def _download_table_bqstorage( selected_fields=None, page_to_item=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, + max_stream_count=None, ): + + + # project_id: str, + # table: 'bigquery.table.Table', # Assuming this is a BigQuery Table object + # bqstorage_client: 'bigquery_storage.BigQueryReadClient', + # preserve_order: bool = False, + # selected_fields: Optional[List['bigquery.schema.SchemaField']] = None, + # page_to_item: Optional[Callable] = None, + # max_queue_size: Optional[int] = _MAX_QUEUE_SIZE_DEFAULT, + # max_stream_count: Optional[int] = None, + # ) -> Generator['pandas.DataFrame', None, None]: + + + """Downloads a BigQuery table using the BigQuery Storage API. + + This method uses the faster, but potentially more expensive, BigQuery + Storage API to download a table as a Pandas DataFrame. It supports + parallel downloads and optional data transformations. + + Args: + project_id (str): The ID of the Google Cloud project containing + the table. + table (bigquery.table.Table): The BigQuery table to download. + bqstorage_client (bigquery_storage.BigQueryReadClient): An + authenticated BigQuery Storage API client. + preserve_order (bool, optional): Whether to preserve the order + of the rows as they are read from BigQuery. Defaults to False. + selected_fields (Optional[List[bigquery.schema.SchemaField]], optional): + A list of BigQuery schema fields to select for download. If None, + all fields are downloaded. Defaults to None. + page_to_item (Optional[Callable], optional): An optional callable + function that takes a page of data from the BigQuery Storage API + and returns an iterable of individual items. If not provided, + each page is treated as a single item. Defaults to None. + max_queue_size (Optional[int], optional): The maximum size of + the queue used to buffer downloaded data. If None, the queue + is unbounded. Defaults to _MAX_QUEUE_SIZE_DEFAULT. + max_stream_count (Optional[int], optional): The maximum number of + concurrent streams to use for downloading data. If None, the + number of streams is determined automatically based on the + `preserve_order` parameter. Defaults to None. + + Yields: + pandas.DataFrame: Pandas DataFrames, one for each chunk of data + downloaded from BigQuery. + + Raises: + ValueError: If attempting to read from a specific partition or snapshot. + + Note: + This method requires the `google-cloud-bigquery-storage` library + to be installed. + """ + + + + + + + + + + """Use (faster, but billable) BQ Storage API to construct DataFrame.""" # Passing a BQ Storage client in implies that the BigQuery Storage library @@ -837,7 +901,9 @@ def _download_table_bqstorage( if "@" in table.table_id: raise ValueError("Reading from a specific snapshot is not currently supported.") - requested_streams = 1 if preserve_order else 0 + # Compare preserve_order vs max_stream_count to determine how many + # streams to use. + requested_streams = determine_requested_streams(preserve_order, max_stream_count) requested_session = bigquery_storage.types.ReadSession( table=table.to_bqstorage(), data_format=bigquery_storage.types.DataFormat.ARROW @@ -949,6 +1015,7 @@ def download_arrow_bqstorage( preserve_order=False, selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, + max_stream_count=None, ): return _download_table_bqstorage( project_id, @@ -958,6 +1025,7 @@ def download_arrow_bqstorage( selected_fields=selected_fields, page_to_item=_bqstorage_page_to_arrow, max_queue_size=max_queue_size, + max_stream_count=max_stream_count, ) @@ -970,6 +1038,7 @@ def download_dataframe_bqstorage( preserve_order=False, selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, + max_stream_count=None, ): page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) return _download_table_bqstorage( @@ -980,6 +1049,7 @@ def download_dataframe_bqstorage( selected_fields=selected_fields, page_to_item=page_to_item, max_queue_size=max_queue_size, + max_stream_count=max_stream_count, ) @@ -1024,3 +1094,34 @@ def verify_pandas_imports(): raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception if db_dtypes is None: raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception + + +def determine_requested_streams( + preserve_order: bool, + max_stream_count: Union[int, None], + ) -> int: + """Determines the value of requested_streams based on the values of + preserver_order and max_stream_count. + + Args: + preserve_order (bool): Whether to preserve the order of streams. (If True, + this limits the number of streams to one.) + max_stream_count (Union[int, None]]): The maximum number of streams + allowed. Must be a non-negative number or None, where None indicates + the value is unset. + + Returns: + (int) The appropriate value for requested_streams. + """ + + if max_stream_count is not None: + # If max_stream_count is set, use it regardless of preserve_order + return max_stream_count + elif preserve_order: + # If max_stream_count is unset but preserve_order is set, + # use 1 (to limit the max_stream_count to 1, to ensure that order + # is preserved) + return 1 + else: + # If both max_stream_count and preserve_order are unset, use 0 + return 0 From 58f3f27009eec140dff6b4f0c0096d5fe26f0b86 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Fri, 4 Oct 2024 12:24:14 +0000 Subject: [PATCH 02/20] Adds new function to compare preserve_order and max_stream_count --- google/cloud/bigquery/_pandas_helpers.py | 91 ++++++++++-------------- 1 file changed, 37 insertions(+), 54 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index d63d7f747..9b7d28f13 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -21,12 +21,15 @@ import logging import queue import warnings -from typing import Any, Union +from typing import Any, Union, Optional, Callable, Generator from google.cloud.bigquery import _pyarrow_helpers from google.cloud.bigquery import _versions_helpers from google.cloud.bigquery import schema +from google.cloud.bigquery.schema import SchemaField +from google.cloud.bigquery.table import Table +from google.cloud.bigquery_storage import BigQueryReadClient try: import pandas # type: ignore @@ -816,28 +819,15 @@ def _nowait(futures): def _download_table_bqstorage( - project_id, - table, - bqstorage_client, - preserve_order=False, - selected_fields=None, - page_to_item=None, - max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, - max_stream_count=None, -): - - - # project_id: str, - # table: 'bigquery.table.Table', # Assuming this is a BigQuery Table object - # bqstorage_client: 'bigquery_storage.BigQueryReadClient', - # preserve_order: bool = False, - # selected_fields: Optional[List['bigquery.schema.SchemaField']] = None, - # page_to_item: Optional[Callable] = None, - # max_queue_size: Optional[int] = _MAX_QUEUE_SIZE_DEFAULT, - # max_stream_count: Optional[int] = None, - # ) -> Generator['pandas.DataFrame', None, None]: - - + project_id: str, + table: Table, + bqstorage_client: BigQueryReadClient, + preserve_order: bool = False, + selected_fields: Optional[list[SchemaField]] = None, + page_to_item: Optional[Callable] = None, + max_queue_size: Optional[int] = _MAX_QUEUE_SIZE_DEFAULT, + max_stream_count: Optional[int] = None, +) -> Generator[pandas.DataFrame, None, None]: """Downloads a BigQuery table using the BigQuery Storage API. This method uses the faster, but potentially more expensive, BigQuery @@ -852,20 +842,25 @@ def _download_table_bqstorage( authenticated BigQuery Storage API client. preserve_order (bool, optional): Whether to preserve the order of the rows as they are read from BigQuery. Defaults to False. - selected_fields (Optional[List[bigquery.schema.SchemaField]], optional): + If True and `max_stream_count` is not set, this limits the number + of streams to one. If `max_stream_count` is set, that will override + values for `preserve_order`. + selected_fields (Optional[list[bigquery.schema.SchemaField]]): A list of BigQuery schema fields to select for download. If None, all fields are downloaded. Defaults to None. - page_to_item (Optional[Callable], optional): An optional callable + page_to_item (Optional[Callable]): An optional callable function that takes a page of data from the BigQuery Storage API and returns an iterable of individual items. If not provided, each page is treated as a single item. Defaults to None. - max_queue_size (Optional[int], optional): The maximum size of + max_queue_size (Optional[int]): The maximum size of the queue used to buffer downloaded data. If None, the queue is unbounded. Defaults to _MAX_QUEUE_SIZE_DEFAULT. - max_stream_count (Optional[int], optional): The maximum number of + max_stream_count (Optional[int]): The maximum number of concurrent streams to use for downloading data. If None, the number of streams is determined automatically based on the - `preserve_order` parameter. Defaults to None. + `preserve_order` parameter. If `max_stream_count` is set to a + non-negative value it will override values for `preserve_order`. + Defaults to None. Yields: pandas.DataFrame: Pandas DataFrames, one for each chunk of data @@ -878,20 +873,7 @@ def _download_table_bqstorage( This method requires the `google-cloud-bigquery-storage` library to be installed. """ - - - - - - - - - - - """Use (faster, but billable) BQ Storage API to construct DataFrame.""" - - # Passing a BQ Storage client in implies that the BigQuery Storage library - # is available and can be imported. + from google.cloud import bigquery_storage if "$" in table.table_id: @@ -901,7 +883,7 @@ def _download_table_bqstorage( if "@" in table.table_id: raise ValueError("Reading from a specific snapshot is not currently supported.") - # Compare preserve_order vs max_stream_count to determine how many + # Compares preserve_order vs max_stream_count to determine how many # streams to use. requested_streams = determine_requested_streams(preserve_order, max_stream_count) @@ -976,7 +958,7 @@ def _download_table_bqstorage( # we want to block on the queue's get method, instead. This # prevents the queue from filling up, because the main thread # has smaller gaps in time between calls to the queue's get - # method. For a detailed explaination, see: + # method. For a detailed explanation, see: # https://friendliness.dev/2019/06/18/python-nowait/ done, not_done = _nowait(not_done) for future in done: @@ -1097,18 +1079,19 @@ def verify_pandas_imports(): def determine_requested_streams( - preserve_order: bool, - max_stream_count: Union[int, None], - ) -> int: + preserve_order: bool, + max_stream_count: Union[int, None], +) -> int: """Determines the value of requested_streams based on the values of - preserver_order and max_stream_count. + `preserve_order` and `max_stream_count`. Args: - preserve_order (bool): Whether to preserve the order of streams. (If True, - this limits the number of streams to one.) - max_stream_count (Union[int, None]]): The maximum number of streams + preserve_order (bool): Whether to preserve the order of streams. If True, + this limits the number of streams to one. + max_stream_count (Union[int, None]]): The maximum number of streams allowed. Must be a non-negative number or None, where None indicates - the value is unset. + the value is unset. If `max_stream_count` is set, it overrides + `preserve_order`. Returns: (int) The appropriate value for requested_streams. @@ -1121,7 +1104,7 @@ def determine_requested_streams( # If max_stream_count is unset but preserve_order is set, # use 1 (to limit the max_stream_count to 1, to ensure that order # is preserved) - return 1 + return 1 else: - # If both max_stream_count and preserve_order are unset, use 0 + # If both max_stream_count and preserve_order are unset, use 0 (unbounded) return 0 From f4e0d6e48a4f3f9d73792265ce5447d9ce8b0e27 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Fri, 4 Oct 2024 14:50:19 +0000 Subject: [PATCH 03/20] Updates type hints to resolve mypy errors --- google/cloud/bigquery/_pandas_helpers.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 9b7d28f13..642211ba1 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -21,7 +21,7 @@ import logging import queue import warnings -from typing import Any, Union, Optional, Callable, Generator +from typing import Any, Union, Optional, Callable, Generator, List from google.cloud.bigquery import _pyarrow_helpers @@ -78,7 +78,7 @@ def _to_wkb(v): _to_wkb = _to_wkb() try: - from google.cloud.bigquery_storage import ArrowSerializationOptions + from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions except ImportError: _ARROW_COMPRESSION_SUPPORT = False else: @@ -823,9 +823,9 @@ def _download_table_bqstorage( table: Table, bqstorage_client: BigQueryReadClient, preserve_order: bool = False, - selected_fields: Optional[list[SchemaField]] = None, + selected_fields: Optional[List[SchemaField]] = None, page_to_item: Optional[Callable] = None, - max_queue_size: Optional[int] = _MAX_QUEUE_SIZE_DEFAULT, + max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, max_stream_count: Optional[int] = None, ) -> Generator[pandas.DataFrame, None, None]: """Downloads a BigQuery table using the BigQuery Storage API. @@ -837,15 +837,15 @@ def _download_table_bqstorage( Args: project_id (str): The ID of the Google Cloud project containing the table. - table (bigquery.table.Table): The BigQuery table to download. - bqstorage_client (bigquery_storage.BigQueryReadClient): An + table (Table): The BigQuery table to download. + bqstorage_client (BigQueryReadClient): An authenticated BigQuery Storage API client. preserve_order (bool, optional): Whether to preserve the order of the rows as they are read from BigQuery. Defaults to False. If True and `max_stream_count` is not set, this limits the number of streams to one. If `max_stream_count` is set, that will override values for `preserve_order`. - selected_fields (Optional[list[bigquery.schema.SchemaField]]): + selected_fields (Optional[List[SchemaField]]): A list of BigQuery schema fields to select for download. If None, all fields are downloaded. Defaults to None. page_to_item (Optional[Callable]): An optional callable @@ -887,8 +887,9 @@ def _download_table_bqstorage( # streams to use. requested_streams = determine_requested_streams(preserve_order, max_stream_count) - requested_session = bigquery_storage.types.ReadSession( - table=table.to_bqstorage(), data_format=bigquery_storage.types.DataFormat.ARROW + requested_session = bigquery_storage.types.stream.ReadSession( + table=table.to_bqstorage(), + data_format=bigquery_storage.types.stream.DataFormat.ARROW, ) if selected_fields is not None: for field in selected_fields: @@ -896,7 +897,8 @@ def _download_table_bqstorage( if _ARROW_COMPRESSION_SUPPORT: requested_session.read_options.arrow_serialization_options.buffer_compression = ( - ArrowSerializationOptions.CompressionCodec.LZ4_FRAME + # CompressionCodec(1) -> LZ4_FRAME + ArrowSerializationOptions.CompressionCodec(1) ) session = bqstorage_client.create_read_session( @@ -932,7 +934,7 @@ def _download_table_bqstorage( elif max_queue_size is None: max_queue_size = 0 # unbounded - worker_queue = queue.Queue(maxsize=max_queue_size) + worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size) with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: try: From 91190d731d2b86732a98e70ba59732af3f573b72 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Fri, 4 Oct 2024 16:52:50 +0000 Subject: [PATCH 04/20] corrects some mypy, linting errors --- google/cloud/bigquery/_pandas_helpers.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 642211ba1..4bef6ff2b 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -27,10 +27,9 @@ from google.cloud.bigquery import _pyarrow_helpers from google.cloud.bigquery import _versions_helpers from google.cloud.bigquery import schema -from google.cloud.bigquery.schema import SchemaField -from google.cloud.bigquery.table import Table from google.cloud.bigquery_storage import BigQueryReadClient + try: import pandas # type: ignore @@ -820,10 +819,10 @@ def _nowait(futures): def _download_table_bqstorage( project_id: str, - table: Table, + table: Any, bqstorage_client: BigQueryReadClient, preserve_order: bool = False, - selected_fields: Optional[List[SchemaField]] = None, + selected_fields: Optional[List[Any]] = None, page_to_item: Optional[Callable] = None, max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, max_stream_count: Optional[int] = None, From 8f1f09080808b14828a74eaa0eee3469ed2f4c93 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Fri, 4 Oct 2024 17:01:41 +0000 Subject: [PATCH 05/20] updates several type hints --- google/cloud/bigquery/_pandas_helpers.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 4bef6ff2b..887d0bc73 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -27,7 +27,6 @@ from google.cloud.bigquery import _pyarrow_helpers from google.cloud.bigquery import _versions_helpers from google.cloud.bigquery import schema -from google.cloud.bigquery_storage import BigQueryReadClient try: @@ -820,7 +819,7 @@ def _nowait(futures): def _download_table_bqstorage( project_id: str, table: Any, - bqstorage_client: BigQueryReadClient, + bqstorage_client: Any, preserve_order: bool = False, selected_fields: Optional[List[Any]] = None, page_to_item: Optional[Callable] = None, @@ -836,8 +835,8 @@ def _download_table_bqstorage( Args: project_id (str): The ID of the Google Cloud project containing the table. - table (Table): The BigQuery table to download. - bqstorage_client (BigQueryReadClient): An + table (Any): The BigQuery table to download. + bqstorage_client (Any): An authenticated BigQuery Storage API client. preserve_order (bool, optional): Whether to preserve the order of the rows as they are read from BigQuery. Defaults to False. From 0e9c1b012fbed0f0484c4132c79ae4901f27c15e Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Fri, 4 Oct 2024 17:18:47 +0000 Subject: [PATCH 06/20] updates pandas type hint because of unit-noextras --- google/cloud/bigquery/_pandas_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 887d0bc73..3511739b9 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -825,7 +825,7 @@ def _download_table_bqstorage( page_to_item: Optional[Callable] = None, max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, max_stream_count: Optional[int] = None, -) -> Generator[pandas.DataFrame, None, None]: +) -> Generator[Any, None, None]: """Downloads a BigQuery table using the BigQuery Storage API. This method uses the faster, but potentially more expensive, BigQuery From 6144379172bc62bf54c735093941f6e9192af711 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Mon, 7 Oct 2024 15:40:14 +0000 Subject: [PATCH 07/20] adds validation and unit tests --- google/cloud/bigquery/_pandas_helpers.py | 2 ++ tests/unit/test__pandas_helpers.py | 29 ++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 3511739b9..d4994d09f 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -1099,6 +1099,8 @@ def determine_requested_streams( if max_stream_count is not None: # If max_stream_count is set, use it regardless of preserve_order + if max_stream_count <= -1: + raise ValueError("max_stream_count must be non-negative OR None") return max_stream_count elif preserve_order: # If max_stream_count is unset but preserve_order is set, diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index 203cc1d1c..c4ec8c8bc 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -18,6 +18,7 @@ import functools import operator import queue +from typing import Union from unittest import mock import warnings @@ -46,6 +47,7 @@ from google.cloud.bigquery import _pyarrow_helpers from google.cloud.bigquery import _versions_helpers from google.cloud.bigquery import schema +from google.cloud.bigquery._pandas_helpers import determine_requested_streams pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import() @@ -2053,3 +2055,30 @@ def test_verify_pandas_imports_no_db_dtypes(module_under_test, monkeypatch): monkeypatch.setattr(module_under_test, "db_dtypes", None) with pytest.raises(ValueError, match="Please install the 'db-dtypes' package"): module_under_test.verify_pandas_imports() + + +@pytest.mark.parametrize( + "preserve_order, max_stream_count, expected_requested_streams", + [ + (True, 10, 10), # max_stream_count takes precedence + (False, 5, 5), # max_stream_count takes precedence + (True, None, 1), # preserve_order (1) respected when max_stream_count is None + (False, None, 0), # Unbounded when both are unset + ], +) +def test_determine_requested_streams( + preserve_order: bool, + max_stream_count: Union[int, None], + expected_requested_streams: int, +): + """Tests various combinations of preserve_order and max_stream_count.""" + actual_requested_streams = determine_requested_streams( + preserve_order, max_stream_count + ) + assert actual_requested_streams == expected_requested_streams + + +def test_determine_requested_streams_invalid_max_stream_count(): + """Tests that a ValueError is raised if max_stream_count is negative.""" + with pytest.raises(ValueError): + determine_requested_streams(preserve_order=False, max_stream_count=-1) \ No newline at end of file From 1d635ace6e5261b95f1f741aba29e44492f624f8 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 7 Oct 2024 15:43:03 +0000 Subject: [PATCH 08/20] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- tests/unit/test__pandas_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index c4ec8c8bc..e6b5f93c5 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2081,4 +2081,4 @@ def test_determine_requested_streams( def test_determine_requested_streams_invalid_max_stream_count(): """Tests that a ValueError is raised if max_stream_count is negative.""" with pytest.raises(ValueError): - determine_requested_streams(preserve_order=False, max_stream_count=-1) \ No newline at end of file + determine_requested_streams(preserve_order=False, max_stream_count=-1) From 0a0c71cb52604ac3ddbf1757d7b1bc84012ccc1b Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Tue, 8 Oct 2024 13:42:09 +0000 Subject: [PATCH 09/20] updates precedence rules and tests --- google/cloud/bigquery/_pandas_helpers.py | 27 ++++++++++++++---------- tests/unit/test__pandas_helpers.py | 10 +++++---- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index d4994d09f..e536b2530 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -1087,26 +1087,31 @@ def determine_requested_streams( Args: preserve_order (bool): Whether to preserve the order of streams. If True, - this limits the number of streams to one. + this limits the number of streams to one. `preserve_order` takes + precedence over `max_stream_count`. max_stream_count (Union[int, None]]): The maximum number of streams allowed. Must be a non-negative number or None, where None indicates - the value is unset. If `max_stream_count` is set, it overrides - `preserve_order`. + the value is unset. NOTE: if `preserve_order` is also set, it takes + precedence over `max_stream_count`, thus to ensure that `max_stream_count` + is used, ensure that `preserve_order` is None. Returns: (int) The appropriate value for requested_streams. """ + + if preserve_order: + # If preserve order is set, it takes precendence. + # Limit the requested streams to 1, to ensure that order + # is preserved) + return 1 - if max_stream_count is not None: - # If max_stream_count is set, use it regardless of preserve_order + elif max_stream_count is not None: + # If preserve_order is not set, only then do we consider max_stream_count if max_stream_count <= -1: raise ValueError("max_stream_count must be non-negative OR None") return max_stream_count - elif preserve_order: - # If max_stream_count is unset but preserve_order is set, - # use 1 (to limit the max_stream_count to 1, to ensure that order - # is preserved) - return 1 + else: - # If both max_stream_count and preserve_order are unset, use 0 (unbounded) + # When preserve_order is False and max_stream_count is None, # of + # requested streams is zero (unbounded) return 0 diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index e6b5f93c5..3cf1a7120 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2060,10 +2060,12 @@ def test_verify_pandas_imports_no_db_dtypes(module_under_test, monkeypatch): @pytest.mark.parametrize( "preserve_order, max_stream_count, expected_requested_streams", [ - (True, 10, 10), # max_stream_count takes precedence - (False, 5, 5), # max_stream_count takes precedence - (True, None, 1), # preserve_order (1) respected when max_stream_count is None - (False, None, 0), # Unbounded when both are unset + # If preserve_order is set/True, it takes precedence: + (True, 10, 1), # use 1 + (True, None, 1), # use 1 + # If preserve_order is not set check max_stream_count: + (False, 10, 10), # max_stream_count (X) takes precedence + (False, None, 0), # Unbounded (0) when both are unset ], ) def test_determine_requested_streams( From 67e7f2bcc592d155794bae9c2e44560675a10e2f Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 8 Oct 2024 13:45:04 +0000 Subject: [PATCH 10/20] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- google/cloud/bigquery/_pandas_helpers.py | 6 +++--- tests/unit/test__pandas_helpers.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index e536b2530..57b7dfe96 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -1087,7 +1087,7 @@ def determine_requested_streams( Args: preserve_order (bool): Whether to preserve the order of streams. If True, - this limits the number of streams to one. `preserve_order` takes + this limits the number of streams to one. `preserve_order` takes precedence over `max_stream_count`. max_stream_count (Union[int, None]]): The maximum number of streams allowed. Must be a non-negative number or None, where None indicates @@ -1098,7 +1098,7 @@ def determine_requested_streams( Returns: (int) The appropriate value for requested_streams. """ - + if preserve_order: # If preserve order is set, it takes precendence. # Limit the requested streams to 1, to ensure that order @@ -1110,7 +1110,7 @@ def determine_requested_streams( if max_stream_count <= -1: raise ValueError("max_stream_count must be non-negative OR None") return max_stream_count - + else: # When preserve_order is False and max_stream_count is None, # of # requested streams is zero (unbounded) diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index 3cf1a7120..3a5fddacc 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2061,10 +2061,10 @@ def test_verify_pandas_imports_no_db_dtypes(module_under_test, monkeypatch): "preserve_order, max_stream_count, expected_requested_streams", [ # If preserve_order is set/True, it takes precedence: - (True, 10, 1), # use 1 - (True, None, 1), # use 1 + (True, 10, 1), # use 1 + (True, None, 1), # use 1 # If preserve_order is not set check max_stream_count: - (False, 10, 10), # max_stream_count (X) takes precedence + (False, 10, 10), # max_stream_count (X) takes precedence (False, None, 0), # Unbounded (0) when both are unset ], ) From 2a8bb7f286a02f7f2438d27acf9c60cefa28d0b5 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Tue, 8 Oct 2024 13:49:41 +0000 Subject: [PATCH 11/20] updates docstring in _download_table_bqstorage --- google/cloud/bigquery/_pandas_helpers.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 57b7dfe96..4372a7f90 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -839,10 +839,9 @@ def _download_table_bqstorage( bqstorage_client (Any): An authenticated BigQuery Storage API client. preserve_order (bool, optional): Whether to preserve the order - of the rows as they are read from BigQuery. Defaults to False. - If True and `max_stream_count` is not set, this limits the number - of streams to one. If `max_stream_count` is set, that will override - values for `preserve_order`. + of the rows as they are read from BigQuery. If True this limits + the number of streams to one and overrides `max_stream_count`. + Defaults to False. selected_fields (Optional[List[SchemaField]]): A list of BigQuery schema fields to select for download. If None, all fields are downloaded. Defaults to None. @@ -854,11 +853,11 @@ def _download_table_bqstorage( the queue used to buffer downloaded data. If None, the queue is unbounded. Defaults to _MAX_QUEUE_SIZE_DEFAULT. max_stream_count (Optional[int]): The maximum number of - concurrent streams to use for downloading data. If None, the - number of streams is determined automatically based on the - `preserve_order` parameter. If `max_stream_count` is set to a - non-negative value it will override values for `preserve_order`. - Defaults to None. + concurrent streams to use for downloading data. If `preserve_order` + is True, `max_stream_count` is ignored. If `preserve_order` is False + and `max_stream_count` is set to a non-negative integer, that number + of streams will be requested. If `max_stream_count` is None, then the + number of requested streams will be unbounded. Defaults to None. Yields: pandas.DataFrame: Pandas DataFrames, one for each chunk of data From 820e4eeb377d3a8ca5311b1592c7ccedec4d5c37 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 8 Oct 2024 13:52:40 +0000 Subject: [PATCH 12/20] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- google/cloud/bigquery/_pandas_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 4372a7f90..1cb5bd2ab 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -856,7 +856,7 @@ def _download_table_bqstorage( concurrent streams to use for downloading data. If `preserve_order` is True, `max_stream_count` is ignored. If `preserve_order` is False and `max_stream_count` is set to a non-negative integer, that number - of streams will be requested. If `max_stream_count` is None, then the + of streams will be requested. If `max_stream_count` is None, then the number of requested streams will be unbounded. Defaults to None. Yields: From 656b4f7f5d332c8f165e278f11332724a5d53dd2 Mon Sep 17 00:00:00 2001 From: Chalmer Lowe Date: Thu, 10 Oct 2024 05:12:34 -0400 Subject: [PATCH 13/20] Update google/cloud/bigquery/_pandas_helpers.py Co-authored-by: Suzy Mueller --- google/cloud/bigquery/_pandas_helpers.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 1cb5bd2ab..c9391417c 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -847,17 +847,11 @@ def _download_table_bqstorage( all fields are downloaded. Defaults to None. page_to_item (Optional[Callable]): An optional callable function that takes a page of data from the BigQuery Storage API - and returns an iterable of individual items. If not provided, - each page is treated as a single item. Defaults to None. - max_queue_size (Optional[int]): The maximum size of - the queue used to buffer downloaded data. If None, the queue - is unbounded. Defaults to _MAX_QUEUE_SIZE_DEFAULT. max_stream_count (Optional[int]): The maximum number of concurrent streams to use for downloading data. If `preserve_order` - is True, `max_stream_count` is ignored. If `preserve_order` is False - and `max_stream_count` is set to a non-negative integer, that number - of streams will be requested. If `max_stream_count` is None, then the - number of requested streams will be unbounded. Defaults to None. + is True, the requested streams are limited to 1 regardless of the + `max_stream_count` value. If 0 or None, then the number of + requested streams will be unbounded. Defaults to None. Yields: pandas.DataFrame: Pandas DataFrames, one for each chunk of data From fe6450124a0ce5e12bbee35138d7c5a319ccd90a Mon Sep 17 00:00:00 2001 From: Chalmer Lowe Date: Thu, 10 Oct 2024 05:12:51 -0400 Subject: [PATCH 14/20] Update google/cloud/bigquery/_pandas_helpers.py Co-authored-by: Suzy Mueller --- google/cloud/bigquery/_pandas_helpers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index c9391417c..4d66a8789 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -1105,6 +1105,5 @@ def determine_requested_streams( return max_stream_count else: - # When preserve_order is False and max_stream_count is None, # of - # requested streams is zero (unbounded) + # Default to zero requested streams (unbounded). return 0 From 9de98fe58e07dd74a9add7d77861013a9a72f9f4 Mon Sep 17 00:00:00 2001 From: Chalmer Lowe Date: Thu, 10 Oct 2024 05:13:07 -0400 Subject: [PATCH 15/20] Update google/cloud/bigquery/_pandas_helpers.py Co-authored-by: Suzy Mueller --- google/cloud/bigquery/_pandas_helpers.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 4d66a8789..8d67b6b8a 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -874,8 +874,6 @@ def _download_table_bqstorage( if "@" in table.table_id: raise ValueError("Reading from a specific snapshot is not currently supported.") - # Compares preserve_order vs max_stream_count to determine how many - # streams to use. requested_streams = determine_requested_streams(preserve_order, max_stream_count) requested_session = bigquery_storage.types.stream.ReadSession( From 8d67a75241431e05c0f8525c537b7e3ce548624d Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 10 Oct 2024 09:14:57 +0000 Subject: [PATCH 16/20] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- google/cloud/bigquery/_pandas_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index c9391417c..91dbce0ea 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -849,7 +849,7 @@ def _download_table_bqstorage( function that takes a page of data from the BigQuery Storage API max_stream_count (Optional[int]): The maximum number of concurrent streams to use for downloading data. If `preserve_order` - is True, the requested streams are limited to 1 regardless of the + is True, the requested streams are limited to 1 regardless of the `max_stream_count` value. If 0 or None, then the number of requested streams will be unbounded. Defaults to None. From e5376135e94f96e62e5ab01e505de63508e6d097 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 10 Oct 2024 09:15:21 +0000 Subject: [PATCH 17/20] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- google/cloud/bigquery/_pandas_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 4d66a8789..afb8b2e4e 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -849,7 +849,7 @@ def _download_table_bqstorage( function that takes a page of data from the BigQuery Storage API max_stream_count (Optional[int]): The maximum number of concurrent streams to use for downloading data. If `preserve_order` - is True, the requested streams are limited to 1 regardless of the + is True, the requested streams are limited to 1 regardless of the `max_stream_count` value. If 0 or None, then the number of requested streams will be unbounded. Defaults to None. From a3160ee55d370025d6309785bef841f77e7ae32d Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 10 Oct 2024 09:15:37 +0000 Subject: [PATCH 18/20] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- google/cloud/bigquery/_pandas_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 8d67b6b8a..1e7b92a96 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -849,7 +849,7 @@ def _download_table_bqstorage( function that takes a page of data from the BigQuery Storage API max_stream_count (Optional[int]): The maximum number of concurrent streams to use for downloading data. If `preserve_order` - is True, the requested streams are limited to 1 regardless of the + is True, the requested streams are limited to 1 regardless of the `max_stream_count` value. If 0 or None, then the number of requested streams will be unbounded. Defaults to None. From 6f12c0c79b557ff7ebfc89bbc4c147d6ca574d7e Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 10 Oct 2024 09:15:59 +0000 Subject: [PATCH 19/20] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- google/cloud/bigquery/_pandas_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 8d67b6b8a..1e7b92a96 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -849,7 +849,7 @@ def _download_table_bqstorage( function that takes a page of data from the BigQuery Storage API max_stream_count (Optional[int]): The maximum number of concurrent streams to use for downloading data. If `preserve_order` - is True, the requested streams are limited to 1 regardless of the + is True, the requested streams are limited to 1 regardless of the `max_stream_count` value. If 0 or None, then the number of requested streams will be unbounded. Defaults to None. From 4da6bf6a22a12b5e91a87284dad59ce68ded2503 Mon Sep 17 00:00:00 2001 From: Chalmer Lowe Date: Thu, 10 Oct 2024 05:17:43 -0400 Subject: [PATCH 20/20] Update _pandas_helpers.py Removed unnecessary else statement. --- google/cloud/bigquery/_pandas_helpers.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 1e7b92a96..bf7d10c0f 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -1102,6 +1102,5 @@ def determine_requested_streams( raise ValueError("max_stream_count must be non-negative OR None") return max_stream_count - else: - # Default to zero requested streams (unbounded). - return 0 + # Default to zero requested streams (unbounded). + return 0