From b183105cc7a8df0a072b7e97db019cd409ff2784 Mon Sep 17 00:00:00 2001 From: Ulada Zakharava Date: Fri, 15 May 2026 12:45:37 +0000 Subject: [PATCH] Deprecate BQ legacy SQL in some cases --- .../google/docs/connections/bigquery.rst | 4 +- .../google/docs/operators/cloud/bigquery.rst | 10 +++ .../providers/google/cloud/hooks/bigquery.py | 16 ++++- .../google/cloud/operators/bigquery.py | 63 ++++++++++++++---- .../google/cloud/transfers/gcs_to_bigquery.py | 3 + .../google/tests/deprecations_ignore.yml | 7 ++ .../bigquery/example_bigquery_queries.py | 3 + .../unit/google/cloud/hooks/test_bigquery.py | 65 ++++++++++++++----- .../google/cloud/operators/test_bigquery.py | 51 +++++++++++++++ 9 files changed, 189 insertions(+), 33 deletions(-) diff --git a/providers/google/docs/connections/bigquery.rst b/providers/google/docs/connections/bigquery.rst index c596d3e86b1c4..9f6f58a4b9b49 100644 --- a/providers/google/docs/connections/bigquery.rst +++ b/providers/google/docs/connections/bigquery.rst @@ -38,7 +38,9 @@ Impersonation Scopes Use Legacy SQL - Whether or not the connection should utilize legacy SQL. + Whether or not the connection should utilize legacy SQL. GoogleSQL is the recommended dialect for + BigQuery. BigQuery legacy SQL availability is restricted after June 1, 2026, based on legacy SQL + usage during Google's evaluation period. Location One of `BigQuery locations `_ where the dataset resides. diff --git a/providers/google/docs/operators/cloud/bigquery.rst b/providers/google/docs/operators/cloud/bigquery.rst index db7c50b60122d..d42cda58656aa 100644 --- a/providers/google/docs/operators/cloud/bigquery.rst +++ b/providers/google/docs/operators/cloud/bigquery.rst @@ -27,6 +27,16 @@ analyzing data to find meaningful insights using familiar SQL. Airflow provides operators to manage datasets and tables, run queries and validate data. +.. note:: + + GoogleSQL is the recommended dialect for BigQuery. BigQuery legacy SQL availability is restricted + after June 1, 2026, based on legacy SQL usage during Google's evaluation period. In Airflow, the + implicit default for older BigQuery operators that expose ``use_legacy_sql`` is deprecated and will + change from ``True`` to ``False`` in a future provider release. Set ``use_legacy_sql=True`` + explicitly if you still need legacy SQL, or set ``use_legacy_sql=False`` to use GoogleSQL. + For more information, see + `Legacy SQL feature availability `__. + Prerequisite Tasks ^^^^^^^^^^^^^^^^^^ diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py index daf4636198564..25294f655b781 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py @@ -95,6 +95,12 @@ log = logging.getLogger(__name__) +BIGQUERY_LEGACY_SQL_DEFAULT_WARNING = ( + "The default value of `use_legacy_sql` is deprecated and will change from `True` to `False` " + "in a future provider release. Set `use_legacy_sql=True` explicitly if you need legacy SQL, " + "or set `use_legacy_sql=False` to use GoogleSQL." +) + BigQueryJob = CopyJob | QueryJob | LoadJob | ExtractJob _ROUTINE_WRITABLE_PROPERTIES: tuple[str, ...] = ( @@ -190,7 +196,15 @@ def __init__( # Use sentinel pattern to distinguish "not provided" from "explicitly provided" if use_legacy_sql is _UNSET: value = self._get_field("use_legacy_sql", _UNSET) - self.use_legacy_sql: bool = value if value is not None else True + if value is None: + warnings.warn( + BIGQUERY_LEGACY_SQL_DEFAULT_WARNING, + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + self.use_legacy_sql = True + else: + self.use_legacy_sql = value else: self.use_legacy_sql = use_legacy_sql # type: ignore[assignment] diff --git a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py index dbce35b85b730..d182ef4b6c6c4 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py @@ -64,6 +64,15 @@ from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.utils.helpers import exactly_one +try: + from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet, is_arg_set +except ImportError: + from airflow.utils.types import NOTSET, ArgNotSet # type: ignore[attr-defined,no-redef] + + def is_arg_set(value): # type: ignore[misc,no-redef] + return value is not NOTSET + + if TYPE_CHECKING: from google.api_core.retry import Retry from google.cloud.bigquery import UnknownJob @@ -72,10 +81,27 @@ BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}" +BIGQUERY_LEGACY_SQL_DEFAULT_WARNING = ( + "The default value of `use_legacy_sql` is deprecated and will change from `True` to `False` " + "in a future provider release. Set `use_legacy_sql=True` explicitly if you need legacy SQL, " + "or set `use_legacy_sql=False` to use GoogleSQL." +) LABEL_REGEX = re.compile(r"^[\w-]{0,63}$") +def _resolve_use_legacy_sql(use_legacy_sql: bool | ArgNotSet) -> bool: + if is_arg_set(use_legacy_sql): + return use_legacy_sql + + warnings.warn( + BIGQUERY_LEGACY_SQL_DEFAULT_WARNING, + AirflowProviderDeprecationWarning, + stacklevel=3, + ) + return True + + class BigQueryUIColors(enum.Enum): """Hex colors for BigQuery operators.""" @@ -229,7 +255,7 @@ def __init__( sql: str, gcp_conn_id: str = "google_cloud_default", project_id: str = PROVIDE_PROJECT_ID, - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, location: str | None = None, impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, @@ -241,7 +267,7 @@ def __init__( ) -> None: super().__init__(sql=sql, **kwargs) self.gcp_conn_id = gcp_conn_id - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) self.location = location self.impersonation_chain = impersonation_chain self.labels = labels @@ -278,6 +304,7 @@ def execute(self, context: Context): hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, + use_legacy_sql=self.use_legacy_sql, ) if self.project_id is None: self.project_id = hook.project_id @@ -387,7 +414,7 @@ def __init__( encryption_configuration: dict | None = None, gcp_conn_id: str = "google_cloud_default", project_id: str = PROVIDE_PROJECT_ID, - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, location: str | None = None, impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, @@ -398,7 +425,7 @@ def __init__( super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, **kwargs) self.location = location self.gcp_conn_id = gcp_conn_id - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) self.encryption_configuration = encryption_configuration self.impersonation_chain = impersonation_chain self.labels = labels @@ -433,7 +460,11 @@ def execute(self, context: Context) -> None: if not self.deferrable: super().execute(context=context) else: - hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + hook = BigQueryHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + use_legacy_sql=self.use_legacy_sql, + ) if self.project_id is None: self.project_id = hook.project_id job = self._submit_job(hook, job_id="") @@ -549,7 +580,7 @@ def __init__( date_filter_column: str = "ds", days_back: SupportsAbs[int] = -7, gcp_conn_id: str = "google_cloud_default", - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, location: str | None = None, encryption_configuration: dict | None = None, impersonation_chain: str | Sequence[str] | None = None, @@ -568,7 +599,7 @@ def __init__( ) self.gcp_conn_id = gcp_conn_id - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) self.location = location self.encryption_configuration = encryption_configuration self.impersonation_chain = impersonation_chain @@ -598,7 +629,11 @@ def execute(self, context: Context): if not self.deferrable: super().execute(context) else: - hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + hook = BigQueryHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + use_legacy_sql=self.use_legacy_sql, + ) self.log.info("Using ratio formula: %s", self.ratio_formula) if self.project_id is None: @@ -701,7 +736,7 @@ def __init__( encryption_configuration: dict | None = None, gcp_conn_id: str = "google_cloud_default", project_id: str = PROVIDE_PROJECT_ID, - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, location: str | None = None, impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, @@ -722,7 +757,7 @@ def __init__( self.accept_none = accept_none self.gcp_conn_id = gcp_conn_id self.encryption_configuration = encryption_configuration - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) self.location = location self.impersonation_chain = impersonation_chain self.labels = labels @@ -842,7 +877,7 @@ def __init__( partition_clause: str | None = None, gcp_conn_id: str = "google_cloud_default", project_id: str = PROVIDE_PROJECT_ID, - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, location: str | None = None, impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, @@ -851,7 +886,7 @@ def __init__( ) -> None: super().__init__(table=table, checks=checks, partition_clause=partition_clause, **kwargs) self.gcp_conn_id = gcp_conn_id - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) self.location = location self.impersonation_chain = impersonation_chain self.labels = labels @@ -1037,7 +1072,7 @@ def __init__( deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), poll_interval: float = 4.0, as_dict: bool = False, - use_legacy_sql: bool = True, + use_legacy_sql: bool | ArgNotSet = NOTSET, **kwargs, ) -> None: super().__init__(**kwargs) @@ -1057,7 +1092,7 @@ def __init__( self.deferrable = deferrable self.poll_interval = poll_interval self.as_dict = as_dict - self.use_legacy_sql = use_legacy_sql + self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql) def _submit_job( self, diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index cee73e5863b9b..e57fc20110f83 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -362,6 +362,7 @@ def execute(self, context: Context): gcp_conn_id=self.gcp_conn_id, location=self.location, impersonation_chain=self.impersonation_chain, + use_legacy_sql=False, ) self.hook = hook self.source_format = self.source_format.upper() @@ -513,6 +514,7 @@ def _find_max_value_in_column(self): gcp_conn_id=self.gcp_conn_id, location=self.location, impersonation_chain=self.impersonation_chain, + use_legacy_sql=False, ) if self.max_id_key: self.log.info("Selecting the MAX value from BigQuery column %r...", self.max_id_key) @@ -854,6 +856,7 @@ def get_openlineage_facets_on_complete(self, task_instance): gcp_conn_id=self.gcp_conn_id, location=self.location, impersonation_chain=self.impersonation_chain, + use_legacy_sql=False, ) project_id = self.project_id or self.hook.project_id diff --git a/providers/google/tests/deprecations_ignore.yml b/providers/google/tests/deprecations_ignore.yml index 2701b0e17f009..7aa24517eea22 100644 --- a/providers/google/tests/deprecations_ignore.yml +++ b/providers/google/tests/deprecations_ignore.yml @@ -57,6 +57,13 @@ - providers/google/tests/unit/google/cloud/hooks/test_bigquery.py::TestTimePartitioningInRunJob::test_run_query_with_arg - providers/google/tests/unit/google/cloud/hooks/test_bigquery.py::TestTimePartitioningInRunJob::test_run_with_auto_detect - providers/google/tests/unit/google/cloud/hooks/test_gcs.py::TestGCSHook::test_list__error_match_glob_and_invalid_delimiter +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryCheckOperator +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryCheckOperators +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryColumnCheckOperator +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryGetDataOperator +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryIntervalCheckOperator +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryTableCheckOperator +- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryValueCheckOperator - providers/google/tests/unit/google/cloud/operators/test_dataproc.py::TestDataprocClusterScaleOperator::test_execute - providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryInsertJobOperator - providers/google/tests/unit/google/cloud/operators/test_dataproc.py::test_create_cluster_operator_extra_links diff --git a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_queries.py b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_queries.py index 5c8b98de2746b..40491b1090212 100644 --- a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_queries.py +++ b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_queries.py @@ -176,6 +176,7 @@ table_id=TABLE_1, max_results=10, selected_fields="value,name", + use_legacy_sql=False, ) # [END howto_operator_bigquery_get_data] @@ -216,6 +217,7 @@ task_id="column_check", table=f"{DATASET_NAME}.{TABLE_1}", column_mapping={"value": {"null_check": {"equal_to": 0}}}, + use_legacy_sql=False, ) # [END howto_operator_bigquery_column_check] @@ -224,6 +226,7 @@ task_id="table_check", table=f"{DATASET_NAME}.{TABLE_1}", checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}}, + use_legacy_sql=False, ) # [END howto_operator_bigquery_table_check] diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py index ddacebbdbe18a..757c503242ce0 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import warnings from datetime import datetime from unittest import mock from unittest.mock import AsyncMock @@ -40,10 +41,12 @@ from google.cloud.bigquery.table import _EmptyRowIterator from google.cloud.exceptions import NotFound +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import DagRun from airflow.providers.common.compat.assets import Asset from airflow.providers.common.compat.sdk import AirflowException, Context from airflow.providers.google.cloud.hooks.bigquery import ( + BIGQUERY_LEGACY_SQL_DEFAULT_WARNING, BigQueryAsyncHook, BigQueryHook, BigQueryTableAsyncHook, @@ -85,7 +88,7 @@ class MockedBigQueryHook(BigQueryHook): def get_credentials_and_project_id(self): return CREDENTIALS, PROJECT_ID - self.hook = MockedBigQueryHook() + self.hook = MockedBigQueryHook(use_legacy_sql=True) @pytest.mark.db_test @@ -1641,6 +1644,34 @@ def test_extra_time_partitioning_options(self): class TestBigQueryHookLegacySql(_BigQueryBaseTestClass): """Ensure `use_legacy_sql` param in `BigQueryHook` propagates properly.""" + @mock.patch( + "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.get_credentials_and_project_id", + return_value=(CREDENTIALS, PROJECT_ID), + ) + def test_implicit_legacy_sql_default_warns(self, _mock_get_creds_and_proj_id): + with pytest.warns( + AirflowProviderDeprecationWarning, + match="The default value of `use_legacy_sql` is deprecated", + ): + bq_hook = BigQueryHook() + + assert bq_hook.use_legacy_sql is True + + @pytest.mark.parametrize("use_legacy_sql", [True, False]) + @mock.patch( + "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.get_credentials_and_project_id", + return_value=(CREDENTIALS, PROJECT_ID), + ) + def test_explicit_legacy_sql_default_does_not_warn(self, _mock_get_creds_and_proj_id, use_legacy_sql): + with warnings.catch_warnings(record=True) as caught_warnings: + warnings.simplefilter("always") + bq_hook = BigQueryHook(use_legacy_sql=use_legacy_sql) + + assert not any( + BIGQUERY_LEGACY_SQL_DEFAULT_WARNING in str(warning.message) for warning in caught_warnings + ) + assert bq_hook.use_legacy_sql is use_legacy_sql + @mock.patch("airflow.providers.google.cloud.hooks.bigquery.build") @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.insert_job") @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryCursor._get_query_result") @@ -1744,7 +1775,7 @@ class MockedBigQueryAsyncHook(BigQueryAsyncHook): def get_credentials_and_project_id(self): return CREDENTIALS, PROJECT_ID - self.hook = MockedBigQueryAsyncHook() + self.hook = MockedBigQueryAsyncHook(use_legacy_sql=True) @pytest.mark.db_test @pytest.mark.asyncio @@ -1754,7 +1785,7 @@ async def test_get_job_instance(self, mock_session, mock_auth_default): mock_credentials = mock.MagicMock(spec=google.auth.compute_engine.Credentials) mock_credentials.token = "ACCESS_TOKEN" mock_auth_default.return_value = (mock_credentials, PROJECT_ID) - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) result = await hook.get_job_instance(project_id=PROJECT_ID, job_id=JOB_ID, session=mock_session) assert isinstance(result, Job) @@ -1769,7 +1800,7 @@ async def test_get_job_runs_via_sync_to_async(self, mock_get_sync_hook, mock_syn mock_async_get_job = mock.AsyncMock(return_value=mock.MagicMock()) mock_sync_to_async.return_value = mock_async_get_job - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) await hook._get_job(job_id=JOB_ID, project_id=PROJECT_ID, location="US") mock_sync_to_async.assert_called_once_with(mock_sync_hook.get_job) @@ -1786,7 +1817,7 @@ async def test_get_job_runs_via_sync_to_async(self, mock_get_sync_hook, mock_syn @pytest.mark.asyncio @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook._get_job") async def test_get_job_status(self, mock_get_job, job_state, error_result, expected): - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) mock_get_job.return_value = mock.MagicMock(state=job_state, error_result=error_result) resp = await hook.get_job_status(job_id=JOB_ID, project_id=PROJECT_ID) assert resp == expected @@ -1794,7 +1825,7 @@ async def test_get_job_status(self, mock_get_job, job_state, error_result, expec @pytest.mark.asyncio @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance") async def test_get_job_output_assert_once_with(self, mock_job_instance): - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) mock_job_client = AsyncMock(Job) mock_job_instance.return_value = mock_job_client response = "success" @@ -1838,7 +1869,7 @@ async def test_cancel_job_failure(self, mock_job, mock_auth_default): mock_job_instance.cancel.side_effect = Exception("Cancellation failed") mock_job.return_value = mock_job_instance - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) job_id = "test_job_id" project_id = "test_project" @@ -1853,7 +1884,7 @@ async def test_cancel_job_failure(self, mock_job, mock_auth_default): @mock.patch("airflow.providers.google.cloud.hooks.bigquery.ClientSession") @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance") async def test_create_job_for_partition_get_with_table(self, mock_job_instance, mock_client_session): - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) mock_job_client = AsyncMock(Job) mock_job_instance.return_value = mock_job_client mock_session = AsyncMock() @@ -1873,7 +1904,7 @@ async def test_create_job_for_partition_get_with_table(self, mock_job_instance, @mock.patch("airflow.providers.google.cloud.hooks.bigquery.ClientSession") @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance") async def test_create_job_for_partition_get(self, mock_job_instance, mock_client_session): - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) mock_job_client = AsyncMock(Job) mock_job_instance.return_value = mock_job_client mock_session = AsyncMock() @@ -1889,7 +1920,7 @@ def test_interval_check_for_airflow_exception(self): """ Assert that check return AirflowException """ - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) row1, row2, metrics_thresholds, ignore_zero, ratio_formula = ( None, @@ -1925,7 +1956,7 @@ def test_interval_check_for_success(self): """ Assert that check return None """ - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) row1, row2, metrics_thresholds, ignore_zero, ratio_formula = ( "0", @@ -1959,7 +1990,7 @@ async def test_get_job_output(self, mock_job_instance): "jobComplete": True, "cacheHit": False, } - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) mock_job_client = AsyncMock(Job) mock_job_instance.return_value = mock_job_client mock_job_client.get_query_results.return_value = response @@ -1974,7 +2005,7 @@ def test_value_check_success(self, records, pass_value, tolerance): """ Assert that value_check method execution succeed """ - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) query = "SELECT COUNT(*) from Any" response = hook.value_check(query, pass_value, records, tolerance) assert response is None @@ -1985,7 +2016,7 @@ def test_value_check_success(self, records, pass_value, tolerance): ) def test_value_check_fail(self, records, pass_value, tolerance): """Assert that check raise AirflowException""" - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) query = "SELECT COUNT(*) from Any" with pytest.raises(AirflowException) as ex: @@ -2027,7 +2058,7 @@ async def test_get_table_client(self, mock_session, mock_auth_default): Table instance object""" mock_credentials = mock.MagicMock(spec=google.auth.compute_engine.Credentials) mock_auth_default.return_value = (mock_credentials, PROJECT_ID) - hook = BigQueryTableAsyncHook() + hook = BigQueryTableAsyncHook(use_legacy_sql=True) result = await hook.get_table_client( dataset=DATASET_ID, project_id=PROJECT_ID, table_id=TABLE_ID, session=mock_session ) @@ -2055,7 +2086,7 @@ def test_get_records_return_type(self): "jobComplete": True, "cacheHit": False, } - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) result = hook.get_records(query_result) assert isinstance(result[0][0], int) assert isinstance(result[0][1], float) @@ -2083,7 +2114,7 @@ def test_get_records_as_dict(self): "jobComplete": True, "cacheHit": False, } - hook = BigQueryAsyncHook() + hook = BigQueryAsyncHook(use_legacy_sql=True) result = hook.get_records(query_result, as_dict=True) assert result == [{"f0_": 22, "f1_": 3.14, "f2_": "PI"}] diff --git a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py index 537599304fcb3..284342b7f7012 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py +++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py @@ -933,6 +933,47 @@ def test_get_db_hook( mock_get_db_hook.assert_called_once() +@pytest.mark.parametrize( + ("operator_class", "kwargs"), + [ + pytest.param( + BigQueryValueCheckOperator, + {"sql": "SELECT COUNT(*) FROM Any", "pass_value": 1}, + id="value-check", + ), + pytest.param( + BigQueryIntervalCheckOperator, + {"table": TEST_TABLE_ID, "metrics_thresholds": {"COUNT(*)": 1.5}}, + id="interval-check", + ), + pytest.param( + BigQueryGetDataOperator, + {"dataset_id": TEST_DATASET, "table_id": TEST_TABLE_ID}, + id="get-data", + ), + pytest.param( + BigQueryColumnCheckOperator, + {"table": TEST_TABLE_ID, "column_mapping": {"col1": {"min": {"greater_than": 0}}}}, + id="column-check", + ), + pytest.param( + BigQueryTableCheckOperator, + { + "table": TEST_TABLE_ID, + "checks": {"row_count_check": {"check_statement": "COUNT(*) > 0"}}, + }, + id="table-check", + ), + ], +) +def test_other_implicit_legacy_sql_default_warns(operator_class, kwargs): + with pytest.warns( + AirflowProviderDeprecationWarning, + match="The default value of `use_legacy_sql` is deprecated", + ): + operator_class(task_id=TASK_ID, **kwargs) + + class TestBigQueryUpsertTableOperator: @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_execute(self, mock_hook): @@ -2397,6 +2438,16 @@ def test_encryption_configuration_deferrable_mode(self, mock_job, mock_hook): class TestBigQueryCheckOperator: + def test_implicit_legacy_sql_default_warns(self): + with pytest.warns( + AirflowProviderDeprecationWarning, + match="The default value of `use_legacy_sql` is deprecated", + ): + BigQueryCheckOperator( + task_id="check_query", + sql="SELECT COUNT(*) FROM Any", + ) + @pytest.mark.db_test @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator._validate_records") @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator.defer")