From 46a69d6297dec217266ccb38b6192801ba2fbd34 Mon Sep 17 00:00:00 2001 From: Arjav Patel Date: Sun, 11 Jan 2026 23:45:52 +0530 Subject: [PATCH 1/6] Enhance Cloud Run integration by adding optional transport parameter for API requests. This allows users to specify 'rest' or 'grpc' transport methods across CloudRunHook, CloudRunAsyncHook, CloudRunExecuteJobOperator, and CloudRunJobFinishedTrigger classes --- .../providers/google/cloud/hooks/cloud_run.py | 24 ++++++++++++++++--- .../google/cloud/operators/cloud_run.py | 17 +++++++++++-- .../google/cloud/triggers/cloud_run.py | 7 ++++++ 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py index ca67373ec64dd..f3290ec40673f 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py @@ -67,16 +67,21 @@ class CloudRunHook(GoogleBaseHook): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account. + :param transport: Optional. The transport to use for API requests. Can be 'rest' or 'grpc'. + Defaults to 'grpc'. Use 'rest' if gRPC is not available or fails in your environment + (e.g., Docker containers with certain network configurations). """ def __init__( self, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, + transport: str | None = None, **kwargs, ) -> None: super().__init__(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, **kwargs) self._client: JobsClient | None = None + self.transport = transport def get_conn(self): """ @@ -85,7 +90,10 @@ def get_conn(self): :return: Cloud Run Jobs client object. """ if self._client is None: - self._client = JobsClient(credentials=self.get_credentials(), client_info=CLIENT_INFO) + client_kwargs = {"credentials": self.get_credentials(), "client_info": CLIENT_INFO} + if self.transport is not None: + client_kwargs["transport"] = self.transport + self._client = JobsClient(**client_kwargs) return self._client @GoogleBaseHook.fallback_to_default_project_id @@ -176,6 +184,9 @@ class CloudRunAsyncHook(GoogleBaseAsyncHook): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account. + :param transport: Optional. The transport to use for API requests. Can be 'rest' or 'grpc'. + Defaults to 'grpc'. Use 'rest' if gRPC is not available or fails in your environment + (e.g., Docker containers with certain network configurations). """ sync_hook_class = CloudRunHook @@ -184,15 +195,22 @@ def __init__( self, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, + transport: str | None = None, **kwargs, ): self._client: JobsAsyncClient | None = None - super().__init__(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, **kwargs) + self.transport = transport + super().__init__( + gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, transport=transport, **kwargs + ) async def get_conn(self): if self._client is None: sync_hook = await self.get_sync_hook() - self._client = JobsAsyncClient(credentials=sync_hook.get_credentials(), client_info=CLIENT_INFO) + client_kwargs = {"credentials": sync_hook.get_credentials(), "client_info": CLIENT_INFO} + if self.transport is not None: + client_kwargs["transport"] = self.transport + self._client = JobsAsyncClient(**client_kwargs) return self._client diff --git a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py index 7e7b5faf1b4a7..30442dfb4674c 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py @@ -263,6 +263,9 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator): Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). :param deferrable: Run the operator in deferrable mode. + :param transport: Optional. The transport to use for API requests. Can be 'rest' or 'grpc'. + Defaults to 'grpc'. Use 'rest' if gRPC is not available or fails in your environment + (e.g., Docker containers with certain network configurations). """ operator_extra_links = (CloudRunJobLoggingLink(),) @@ -275,6 +278,7 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator): "overrides", "polling_period_seconds", "timeout_seconds", + "transport", ) def __init__( @@ -288,6 +292,7 @@ def __init__( gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + transport: str | None = None, **kwargs, ): super().__init__(**kwargs) @@ -300,11 +305,14 @@ def __init__( self.polling_period_seconds = polling_period_seconds self.timeout_seconds = timeout_seconds self.deferrable = deferrable + self.transport = transport self.operation: operation.Operation | None = None def execute(self, context: Context): hook: CloudRunHook = CloudRunHook( - gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + transport=self.transport, ) self.operation = hook.execute_job( region=self.region, project_id=self.project_id, job_name=self.job_name, overrides=self.overrides @@ -333,6 +341,7 @@ def execute(self, context: Context): gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, polling_period_seconds=self.polling_period_seconds, + transport=self.transport, ), method_name="execute_complete", ) @@ -350,7 +359,11 @@ def execute_complete(self, context: Context, event: dict): f"Operation failed with error code [{error_code}] and error message [{error_message}]" ) - hook: CloudRunHook = CloudRunHook(self.gcp_conn_id, self.impersonation_chain) + hook: CloudRunHook = CloudRunHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + transport=self.transport, + ) job = hook.get_job(job_name=event["job_name"], region=self.region, project_id=self.project_id) return Job.to_dict(job) diff --git a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py index b5547f45bac7e..38afe677efb8b 100644 --- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py @@ -59,6 +59,9 @@ class CloudRunJobFinishedTrigger(BaseTrigger): account from the list granting this role to the originating account (templated). :param poll_sleep: Polling period in seconds to check for the status. :timeout: The time to wait before failing the operation. + :param transport: Optional. The transport to use for API requests. Can be 'rest' or 'grpc'. + Defaults to 'grpc'. Use 'rest' if gRPC is not available or fails in your environment + (e.g., Docker containers with certain network configurations). """ def __init__( @@ -71,6 +74,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, polling_period_seconds: float = 10, timeout: float | None = None, + transport: str | None = None, ): super().__init__() self.project_id = project_id @@ -81,6 +85,7 @@ def __init__( self.polling_period_seconds = polling_period_seconds self.timeout = timeout self.impersonation_chain = impersonation_chain + self.transport = transport def serialize(self) -> tuple[str, dict[str, Any]]: """Serialize class arguments and classpath.""" @@ -95,6 +100,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "polling_period_seconds": self.polling_period_seconds, "timeout": self.timeout, "impersonation_chain": self.impersonation_chain, + "transport": self.transport, }, ) @@ -143,4 +149,5 @@ def _get_async_hook(self) -> CloudRunAsyncHook: return CloudRunAsyncHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, + transport=self.transport, ) From 928d0aa9b24a8ae7f99a819fc04e06647eb80cb2 Mon Sep 17 00:00:00 2001 From: Arjav Patel Date: Mon, 12 Jan 2026 00:00:39 +0530 Subject: [PATCH 2/6] Add tests for transport parameter in CloudRunHook and CloudRunExecuteJobOperator --- .../unit/google/cloud/hooks/test_cloud_run.py | 30 +++++++++++++++++++ .../google/cloud/operators/test_cloud_run.py | 22 ++++++++++++++ .../google/cloud/triggers/test_cloud_run.py | 1 + 3 files changed, 53 insertions(+) diff --git a/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py b/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py index bccea8c3e34f7..71303f35b6cc2 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py @@ -259,6 +259,36 @@ def test_delete_job(self, mock_batch_service_client, cloud_run_hook): cloud_run_hook.delete_job(job_name=JOB_NAME, region=REGION, project_id=PROJECT_ID) cloud_run_hook._client.delete_job.assert_called_once_with(delete_request) + @mock.patch( + "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__", + new=mock_base_gcp_hook_default_project_id, + ) + @mock.patch("airflow.providers.google.cloud.hooks.cloud_run.JobsClient") + def test_get_conn_with_transport(self, mock_jobs_client): + """Test that transport parameter is passed to JobsClient.""" + hook = CloudRunHook(transport="rest") + hook.get_credentials = self.dummy_get_credentials + hook.get_conn() + + mock_jobs_client.assert_called_once() + call_kwargs = mock_jobs_client.call_args[1] + assert call_kwargs["transport"] == "rest" + + @mock.patch( + "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__", + new=mock_base_gcp_hook_default_project_id, + ) + @mock.patch("airflow.providers.google.cloud.hooks.cloud_run.JobsClient") + def test_get_conn_without_transport(self, mock_jobs_client): + """Test that JobsClient is created without transport when not specified.""" + hook = CloudRunHook() + hook.get_credentials = self.dummy_get_credentials + hook.get_conn() + + mock_jobs_client.assert_called_once() + call_kwargs = mock_jobs_client.call_args[1] + assert "transport" not in call_kwargs + def _mock_pager(self, number_of_jobs): mock_pager = [] for i in range(number_of_jobs): diff --git a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py index d4431877121e2..3c389713b1ab4 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py +++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py @@ -102,6 +102,28 @@ def test_template_fields(self): assert "overrides" in operator.template_fields assert "polling_period_seconds" in operator.template_fields assert "timeout_seconds" in operator.template_fields + assert "transport" in operator.template_fields + + @mock.patch(CLOUD_RUN_HOOK_PATH) + def test_execute_with_transport(self, hook_mock): + """Test that transport parameter is passed to CloudRunHook.""" + hook_mock.return_value.get_job.return_value = JOB + hook_mock.return_value.execute_job.return_value = self._mock_operation(3, 3, 0) + + operator = CloudRunExecuteJobOperator( + task_id=TASK_ID, + project_id=PROJECT_ID, + region=REGION, + job_name=JOB_NAME, + transport="rest", + ) + + operator.execute(context=mock.MagicMock()) + + # Verify that CloudRunHook was instantiated with transport parameter + hook_mock.assert_called_once() + call_kwargs = hook_mock.call_args[1] + assert call_kwargs["transport"] == "rest" @mock.patch(CLOUD_RUN_HOOK_PATH) def test_execute_success(self, hook_mock): diff --git a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py index 7a526d590c265..25f2f48831a29 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py @@ -65,6 +65,7 @@ def test_serialization(self, trigger): "polling_period_seconds": POLL_SLEEP, "timeout": TIMEOUT, "impersonation_chain": IMPERSONATION_CHAIN, + "transport": None, } @pytest.mark.asyncio From 45449d0564d417344a957794ccf38dcc2daa3c2f Mon Sep 17 00:00:00 2001 From: Arjav Patel Date: Mon, 12 Jan 2026 14:09:15 +0530 Subject: [PATCH 3/6] Update Cloud Run integration to enforce 'grpc' as the default transport method in CloudRunHook, CloudRunAsyncHook, CloudRunExecuteJobOperator, and CloudRunJobFinishedTrigger classes. Adjusted related tests to reflect this change --- .../providers/google/cloud/hooks/cloud_run.py | 22 +++++++++++-------- .../google/cloud/operators/cloud_run.py | 4 ++-- .../google/cloud/triggers/cloud_run.py | 4 ++-- .../unit/google/cloud/hooks/test_cloud_run.py | 4 ++-- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py index f3290ec40673f..65134f471e483 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py @@ -19,7 +19,7 @@ import itertools from collections.abc import Iterable, Sequence -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal from google.cloud.run_v2 import ( CreateJobRequest, @@ -76,7 +76,7 @@ def __init__( self, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, - transport: str | None = None, + transport: Literal["rest", "grpc"] = "grpc", **kwargs, ) -> None: super().__init__(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, **kwargs) @@ -90,9 +90,11 @@ def get_conn(self): :return: Cloud Run Jobs client object. """ if self._client is None: - client_kwargs = {"credentials": self.get_credentials(), "client_info": CLIENT_INFO} - if self.transport is not None: - client_kwargs["transport"] = self.transport + client_kwargs = { + "credentials": self.get_credentials(), + "client_info": CLIENT_INFO, + "transport": self.transport, + } self._client = JobsClient(**client_kwargs) return self._client @@ -195,7 +197,7 @@ def __init__( self, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, - transport: str | None = None, + transport: Literal["rest", "grpc"] = "grpc", **kwargs, ): self._client: JobsAsyncClient | None = None @@ -207,9 +209,11 @@ def __init__( async def get_conn(self): if self._client is None: sync_hook = await self.get_sync_hook() - client_kwargs = {"credentials": sync_hook.get_credentials(), "client_info": CLIENT_INFO} - if self.transport is not None: - client_kwargs["transport"] = self.transport + client_kwargs = { + "credentials": sync_hook.get_credentials(), + "client_info": CLIENT_INFO, + "transport": self.transport, + } self._client = JobsAsyncClient(**client_kwargs) return self._client diff --git a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py index 30442dfb4674c..efc0bd4ca5bc7 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py @@ -18,7 +18,7 @@ from __future__ import annotations from collections.abc import Sequence -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal import google.cloud.exceptions from google.api_core.exceptions import AlreadyExists @@ -292,7 +292,7 @@ def __init__( gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), - transport: str | None = None, + transport: Literal["rest", "grpc"] = "grpc", **kwargs, ): super().__init__(**kwargs) diff --git a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py index 38afe677efb8b..92765500c979c 100644 --- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py @@ -19,7 +19,7 @@ import asyncio from collections.abc import AsyncIterator, Sequence from enum import Enum -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.google.cloud.hooks.cloud_run import CloudRunAsyncHook @@ -74,7 +74,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, polling_period_seconds: float = 10, timeout: float | None = None, - transport: str | None = None, + transport: Literal["rest", "grpc"] = "grpc", ): super().__init__() self.project_id = project_id diff --git a/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py b/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py index 71303f35b6cc2..53cc732718fea 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py @@ -280,14 +280,14 @@ def test_get_conn_with_transport(self, mock_jobs_client): ) @mock.patch("airflow.providers.google.cloud.hooks.cloud_run.JobsClient") def test_get_conn_without_transport(self, mock_jobs_client): - """Test that JobsClient is created without transport when not specified.""" + """Test that JobsClient is created with default 'grpc' transport when not specified.""" hook = CloudRunHook() hook.get_credentials = self.dummy_get_credentials hook.get_conn() mock_jobs_client.assert_called_once() call_kwargs = mock_jobs_client.call_args[1] - assert "transport" not in call_kwargs + assert call_kwargs["transport"] == "grpc" def _mock_pager(self, number_of_jobs): mock_pager = [] From d0d6989227c663f67e7a49f3668148c87204a43b Mon Sep 17 00:00:00 2001 From: Arjav Patel Date: Mon, 12 Jan 2026 18:23:35 +0530 Subject: [PATCH 4/6] Update CloudRunJobFinishedTrigger to allow None as a transport option, defaulting to 'grpc' for backward compatibility. Adjusted related tests to accommodate this change --- .../airflow/providers/google/cloud/triggers/cloud_run.py | 6 ++++-- .../tests/unit/google/cloud/triggers/test_cloud_run.py | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py index 92765500c979c..ab4bbec20f0e4 100644 --- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py @@ -74,7 +74,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, polling_period_seconds: float = 10, timeout: float | None = None, - transport: Literal["rest", "grpc"] = "grpc", + transport: Literal["rest", "grpc"] | None = "grpc", ): super().__init__() self.project_id = project_id @@ -146,8 +146,10 @@ async def run(self) -> AsyncIterator[TriggerEvent]: ) def _get_async_hook(self) -> CloudRunAsyncHook: + # Convert None to "grpc" for backward compatibility with old serialized triggers + transport = self.transport if self.transport is not None else "grpc" return CloudRunAsyncHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, - transport=self.transport, + transport=transport, ) diff --git a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py index 25f2f48831a29..3902a17885e0f 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py @@ -49,6 +49,7 @@ def trigger(): polling_period_seconds=POLL_SLEEP, timeout=TIMEOUT, impersonation_chain=IMPERSONATION_CHAIN, + transport=None, ) From 8b604fd9ba08e97fdbf9d83a506f7ff88460bc80 Mon Sep 17 00:00:00 2001 From: Arjav Patel Date: Wed, 14 Jan 2026 03:24:55 +0530 Subject: [PATCH 5/6] Refactor transport parameter in CloudRunHook, CloudRunAsyncHook, and CloudRunExecuteJobOperator to allow None as a valid option, enabling automatic transport selection. Updated related tests to validate this change --- .../providers/google/cloud/hooks/cloud_run.py | 12 +++++----- .../google/cloud/operators/cloud_run.py | 6 ++--- .../unit/google/cloud/hooks/test_cloud_run.py | 22 ++++--------------- 3 files changed, 13 insertions(+), 27 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py index 65134f471e483..f2f3ba417b1c0 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py @@ -68,15 +68,15 @@ class CloudRunHook(GoogleBaseHook): Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account. :param transport: Optional. The transport to use for API requests. Can be 'rest' or 'grpc'. - Defaults to 'grpc'. Use 'rest' if gRPC is not available or fails in your environment - (e.g., Docker containers with certain network configurations). + If set to None, a transport is chosen automatically. Use 'rest' if gRPC is not available + or fails in your environment (e.g., Docker containers with certain network configurations). """ def __init__( self, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, - transport: Literal["rest", "grpc"] = "grpc", + transport: Literal["rest", "grpc"] | None = None, **kwargs, ) -> None: super().__init__(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, **kwargs) @@ -187,8 +187,8 @@ class CloudRunAsyncHook(GoogleBaseAsyncHook): Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account. :param transport: Optional. The transport to use for API requests. Can be 'rest' or 'grpc'. - Defaults to 'grpc'. Use 'rest' if gRPC is not available or fails in your environment - (e.g., Docker containers with certain network configurations). + If set to None, a transport is chosen automatically. Use 'rest' if gRPC is not available + or fails in your environment (e.g., Docker containers with certain network configurations). """ sync_hook_class = CloudRunHook @@ -197,7 +197,7 @@ def __init__( self, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, - transport: Literal["rest", "grpc"] = "grpc", + transport: Literal["rest", "grpc"] | None = None, **kwargs, ): self._client: JobsAsyncClient | None = None diff --git a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py index efc0bd4ca5bc7..5c12dd4d7dafc 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py @@ -264,8 +264,8 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator): account from the list granting this role to the originating account (templated). :param deferrable: Run the operator in deferrable mode. :param transport: Optional. The transport to use for API requests. Can be 'rest' or 'grpc'. - Defaults to 'grpc'. Use 'rest' if gRPC is not available or fails in your environment - (e.g., Docker containers with certain network configurations). + If set to None, a transport is chosen automatically. Use 'rest' if gRPC is not available + or fails in your environment (e.g., Docker containers with certain network configurations). """ operator_extra_links = (CloudRunJobLoggingLink(),) @@ -292,7 +292,7 @@ def __init__( gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), - transport: Literal["rest", "grpc"] = "grpc", + transport: Literal["rest", "grpc"] | None = None, **kwargs, ): super().__init__(**kwargs) diff --git a/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py b/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py index 53cc732718fea..4a8150459c4a0 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py @@ -264,30 +264,16 @@ def test_delete_job(self, mock_batch_service_client, cloud_run_hook): new=mock_base_gcp_hook_default_project_id, ) @mock.patch("airflow.providers.google.cloud.hooks.cloud_run.JobsClient") - def test_get_conn_with_transport(self, mock_jobs_client): + @pytest.mark.parametrize(("transport", "expected_transport"), [("rest", "rest"), (None, None)]) + def test_get_conn_with_transport(self, mock_jobs_client, transport, expected_transport): """Test that transport parameter is passed to JobsClient.""" - hook = CloudRunHook(transport="rest") + hook = CloudRunHook(transport=transport) hook.get_credentials = self.dummy_get_credentials hook.get_conn() mock_jobs_client.assert_called_once() call_kwargs = mock_jobs_client.call_args[1] - assert call_kwargs["transport"] == "rest" - - @mock.patch( - "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__", - new=mock_base_gcp_hook_default_project_id, - ) - @mock.patch("airflow.providers.google.cloud.hooks.cloud_run.JobsClient") - def test_get_conn_without_transport(self, mock_jobs_client): - """Test that JobsClient is created with default 'grpc' transport when not specified.""" - hook = CloudRunHook() - hook.get_credentials = self.dummy_get_credentials - hook.get_conn() - - mock_jobs_client.assert_called_once() - call_kwargs = mock_jobs_client.call_args[1] - assert call_kwargs["transport"] == "grpc" + assert call_kwargs["transport"] == expected_transport def _mock_pager(self, number_of_jobs): mock_pager = [] From c28d80cdb31af13356186e2842b96cf693a71506 Mon Sep 17 00:00:00 2001 From: Arjav Patel Date: Sat, 24 Jan 2026 23:51:31 +0530 Subject: [PATCH 6/6] Refactor transport handling in CloudRunJobFinishedTrigger to default to 'grpc' when None is provided, ensuring backward compatibility. Updated related logic for consistency --- .../airflow/providers/google/cloud/triggers/cloud_run.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py index ab4bbec20f0e4..8261edd416a3e 100644 --- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py @@ -74,7 +74,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, polling_period_seconds: float = 10, timeout: float | None = None, - transport: Literal["rest", "grpc"] | None = "grpc", + transport: Literal["rest", "grpc"] | None = None, ): super().__init__() self.project_id = project_id @@ -146,10 +146,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: ) def _get_async_hook(self) -> CloudRunAsyncHook: - # Convert None to "grpc" for backward compatibility with old serialized triggers - transport = self.transport if self.transport is not None else "grpc" return CloudRunAsyncHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, - transport=transport, + transport=self.transport or "grpc", )