Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion providers/google/docs/connections/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ Impersonation Scopes


Use Legacy SQL
Whether or not the connection should utilize legacy SQL.
Whether or not the connection should utilize legacy SQL. GoogleSQL is the recommended dialect for
BigQuery. BigQuery legacy SQL availability is restricted after June 1, 2026, based on legacy SQL
usage during Google's evaluation period.

Location
One of `BigQuery locations <https://cloud.google.com/bigquery/docs/locations>`_ where the dataset resides.
Expand Down
10 changes: 10 additions & 0 deletions providers/google/docs/operators/cloud/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ analyzing data to find meaningful insights using familiar SQL.
Airflow provides operators to manage datasets and tables, run queries and validate
data.

.. note::

GoogleSQL is the recommended dialect for BigQuery. BigQuery legacy SQL availability is restricted
after June 1, 2026, based on legacy SQL usage during Google's evaluation period. In Airflow, the
implicit default for older BigQuery operators that expose ``use_legacy_sql`` is deprecated and will
change from ``True`` to ``False`` in a future provider release. Set ``use_legacy_sql=True``
explicitly if you still need legacy SQL, or set ``use_legacy_sql=False`` to use GoogleSQL.
For more information, see
`Legacy SQL feature availability <https://docs.cloud.google.com/bigquery/docs/legacy-sql-feature-availability>`__.

Prerequisite Tasks
^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@

log = logging.getLogger(__name__)

BIGQUERY_LEGACY_SQL_DEFAULT_WARNING = (
"The default value of `use_legacy_sql` is deprecated and will change from `True` to `False` "
"in a future provider release. Set `use_legacy_sql=True` explicitly if you need legacy SQL, "
"or set `use_legacy_sql=False` to use GoogleSQL."
)

BigQueryJob = CopyJob | QueryJob | LoadJob | ExtractJob

_ROUTINE_WRITABLE_PROPERTIES: tuple[str, ...] = (
Expand Down Expand Up @@ -190,7 +196,15 @@ def __init__(
# Use sentinel pattern to distinguish "not provided" from "explicitly provided"
if use_legacy_sql is _UNSET:
value = self._get_field("use_legacy_sql", _UNSET)
self.use_legacy_sql: bool = value if value is not None else True
if value is None:
warnings.warn(
BIGQUERY_LEGACY_SQL_DEFAULT_WARNING,
AirflowProviderDeprecationWarning,
stacklevel=2,
)
self.use_legacy_sql = True
else:
self.use_legacy_sql = value
Comment thread
shahar1 marked this conversation as resolved.
else:
self.use_legacy_sql = use_legacy_sql # type: ignore[assignment]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@
from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID
Comment thread
VladaZakharova marked this conversation as resolved.
from airflow.utils.helpers import exactly_one

try:
from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet, is_arg_set
except ImportError:
from airflow.utils.types import NOTSET, ArgNotSet # type: ignore[attr-defined,no-redef]

def is_arg_set(value): # type: ignore[misc,no-redef]
return value is not NOTSET


if TYPE_CHECKING:
from google.api_core.retry import Retry
from google.cloud.bigquery import UnknownJob
Expand All @@ -72,10 +81,27 @@


BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}"
BIGQUERY_LEGACY_SQL_DEFAULT_WARNING = (
"The default value of `use_legacy_sql` is deprecated and will change from `True` to `False` "
"in a future provider release. Set `use_legacy_sql=True` explicitly if you need legacy SQL, "
"or set `use_legacy_sql=False` to use GoogleSQL."
)

LABEL_REGEX = re.compile(r"^[\w-]{0,63}$")


def _resolve_use_legacy_sql(use_legacy_sql: bool | ArgNotSet) -> bool:
if is_arg_set(use_legacy_sql):
return use_legacy_sql

warnings.warn(
BIGQUERY_LEGACY_SQL_DEFAULT_WARNING,
AirflowProviderDeprecationWarning,
stacklevel=3,
)
return True


class BigQueryUIColors(enum.Enum):
"""Hex colors for BigQuery operators."""

Expand Down Expand Up @@ -229,7 +255,7 @@ def __init__(
sql: str,
gcp_conn_id: str = "google_cloud_default",
project_id: str = PROVIDE_PROJECT_ID,
use_legacy_sql: bool = True,
use_legacy_sql: bool | ArgNotSet = NOTSET,
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
Expand All @@ -241,7 +267,7 @@ def __init__(
) -> None:
super().__init__(sql=sql, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.use_legacy_sql = use_legacy_sql
self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql)
self.location = location
self.impersonation_chain = impersonation_chain
self.labels = labels
Expand Down Expand Up @@ -278,6 +304,7 @@ def execute(self, context: Context):
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
use_legacy_sql=self.use_legacy_sql,
)
if self.project_id is None:
self.project_id = hook.project_id
Expand Down Expand Up @@ -387,7 +414,7 @@ def __init__(
encryption_configuration: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
project_id: str = PROVIDE_PROJECT_ID,
use_legacy_sql: bool = True,
use_legacy_sql: bool | ArgNotSet = NOTSET,
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
Expand All @@ -398,7 +425,7 @@ def __init__(
super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, **kwargs)
self.location = location
self.gcp_conn_id = gcp_conn_id
self.use_legacy_sql = use_legacy_sql
self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql)
self.encryption_configuration = encryption_configuration
self.impersonation_chain = impersonation_chain
self.labels = labels
Expand Down Expand Up @@ -433,7 +460,11 @@ def execute(self, context: Context) -> None:
if not self.deferrable:
super().execute(context=context)
else:
hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
use_legacy_sql=self.use_legacy_sql,
)
if self.project_id is None:
self.project_id = hook.project_id
job = self._submit_job(hook, job_id="")
Expand Down Expand Up @@ -549,7 +580,7 @@ def __init__(
date_filter_column: str = "ds",
days_back: SupportsAbs[int] = -7,
gcp_conn_id: str = "google_cloud_default",
use_legacy_sql: bool = True,
use_legacy_sql: bool | ArgNotSet = NOTSET,
location: str | None = None,
encryption_configuration: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
Expand All @@ -568,7 +599,7 @@ def __init__(
)

self.gcp_conn_id = gcp_conn_id
self.use_legacy_sql = use_legacy_sql
self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql)
self.location = location
self.encryption_configuration = encryption_configuration
self.impersonation_chain = impersonation_chain
Expand Down Expand Up @@ -598,7 +629,11 @@ def execute(self, context: Context):
if not self.deferrable:
super().execute(context)
else:
hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
use_legacy_sql=self.use_legacy_sql,
)
self.log.info("Using ratio formula: %s", self.ratio_formula)

if self.project_id is None:
Expand Down Expand Up @@ -701,7 +736,7 @@ def __init__(
encryption_configuration: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
project_id: str = PROVIDE_PROJECT_ID,
use_legacy_sql: bool = True,
use_legacy_sql: bool | ArgNotSet = NOTSET,
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
Expand All @@ -722,7 +757,7 @@ def __init__(
self.accept_none = accept_none
self.gcp_conn_id = gcp_conn_id
self.encryption_configuration = encryption_configuration
self.use_legacy_sql = use_legacy_sql
self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql)
self.location = location
self.impersonation_chain = impersonation_chain
self.labels = labels
Expand Down Expand Up @@ -842,7 +877,7 @@ def __init__(
partition_clause: str | None = None,
gcp_conn_id: str = "google_cloud_default",
project_id: str = PROVIDE_PROJECT_ID,
use_legacy_sql: bool = True,
use_legacy_sql: bool | ArgNotSet = NOTSET,
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
Expand All @@ -851,7 +886,7 @@ def __init__(
) -> None:
super().__init__(table=table, checks=checks, partition_clause=partition_clause, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.use_legacy_sql = use_legacy_sql
self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql)
self.location = location
self.impersonation_chain = impersonation_chain
self.labels = labels
Expand Down Expand Up @@ -1037,7 +1072,7 @@ def __init__(
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
poll_interval: float = 4.0,
as_dict: bool = False,
use_legacy_sql: bool = True,
use_legacy_sql: bool | ArgNotSet = NOTSET,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -1057,7 +1092,7 @@ def __init__(
self.deferrable = deferrable
self.poll_interval = poll_interval
self.as_dict = as_dict
self.use_legacy_sql = use_legacy_sql
self.use_legacy_sql = _resolve_use_legacy_sql(use_legacy_sql)

def _submit_job(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ def execute(self, context: Context):
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
use_legacy_sql=False,
)
self.hook = hook
self.source_format = self.source_format.upper()
Expand Down Expand Up @@ -513,6 +514,7 @@ def _find_max_value_in_column(self):
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
use_legacy_sql=False,
)
if self.max_id_key:
self.log.info("Selecting the MAX value from BigQuery column %r...", self.max_id_key)
Expand Down Expand Up @@ -854,6 +856,7 @@ def get_openlineage_facets_on_complete(self, task_instance):
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
use_legacy_sql=False,
)

project_id = self.project_id or self.hook.project_id
Expand Down
7 changes: 7 additions & 0 deletions providers/google/tests/deprecations_ignore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@
- providers/google/tests/unit/google/cloud/hooks/test_bigquery.py::TestTimePartitioningInRunJob::test_run_query_with_arg
- providers/google/tests/unit/google/cloud/hooks/test_bigquery.py::TestTimePartitioningInRunJob::test_run_with_auto_detect
- providers/google/tests/unit/google/cloud/hooks/test_gcs.py::TestGCSHook::test_list__error_match_glob_and_invalid_delimiter
- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryCheckOperator
- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryCheckOperators
- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryColumnCheckOperator
- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryGetDataOperator
- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryIntervalCheckOperator
- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryTableCheckOperator
- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryValueCheckOperator
- providers/google/tests/unit/google/cloud/operators/test_dataproc.py::TestDataprocClusterScaleOperator::test_execute
- providers/google/tests/unit/google/cloud/operators/test_bigquery.py::TestBigQueryInsertJobOperator
- providers/google/tests/unit/google/cloud/operators/test_dataproc.py::test_create_cluster_operator_extra_links
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
table_id=TABLE_1,
max_results=10,
selected_fields="value,name",
use_legacy_sql=False,
)
# [END howto_operator_bigquery_get_data]

Expand Down Expand Up @@ -216,6 +217,7 @@
task_id="column_check",
table=f"{DATASET_NAME}.{TABLE_1}",
column_mapping={"value": {"null_check": {"equal_to": 0}}},
use_legacy_sql=False,
)
# [END howto_operator_bigquery_column_check]

Expand All @@ -224,6 +226,7 @@
task_id="table_check",
table=f"{DATASET_NAME}.{TABLE_1}",
checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
use_legacy_sql=False,
)
# [END howto_operator_bigquery_table_check]

Expand Down
Loading
Loading