diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py b/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py index 1a935438d102e..a721578132edb 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py @@ -80,13 +80,20 @@ def get_conn(self) -> client.ApiClient: return client.ApiClient(configuration) def _refresh_api_key_hook(self, configuration: client.configuration.Configuration): - configuration.api_key = {"authorization": self._get_token(self._credentials)} + configuration.api_key = self._bearer_api_key(self._get_token(self._credentials)) + + @staticmethod + def _bearer_api_key(token: str) -> dict[str, str]: + # kubernetes-client 36.x renamed the bearer auth key 'authorization' -> 'BearerToken' + # (https://github.com/kubernetes-client/python/issues/2582). Register both so the 'Bearer' + # prefix is applied on client 35.x and 36.x alike; otherwise 36.x sends the raw token -> 401. + return {"authorization": token, "BearerToken": token} def _get_config(self) -> client.configuration.Configuration: configuration = client.Configuration( host=self._cluster_url, - api_key_prefix={"authorization": "Bearer"}, - api_key={"authorization": self._get_token(self._credentials)}, + api_key_prefix={"authorization": "Bearer", "BearerToken": "Bearer"}, + api_key=self._bearer_api_key(self._get_token(self._credentials)), ) if not self.use_dns_endpoint: configuration.ssl_ca_cert = FileOrData( diff --git a/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py b/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py index 3121ee1b7da56..f3a1a41620416 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py @@ -31,6 +31,7 @@ from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.google.cloud.hooks.kubernetes_engine import ( GKEAsyncHook, + GKEClusterConnection, GKEHook, GKEKubernetesAsyncHook, GKEKubernetesHook, @@ -486,6 +487,32 @@ def test_apply_from_yaml_file(self, mock_get_conn, mock_super, api_client, expec mock_super.return_value.apply_from_yaml_file.assert_called_once_with(**expected_kwargs) +class TestGKEClusterConnection: + @pytest.mark.parametrize("expired", [False, True]) + def test_registers_bearer_token_for_client_35_and_36(self, expired): + # The bearer token and its "Bearer" prefix must be registered under both the client-35.x key + # ('authorization') and the client-36.x key ('BearerToken', renamed in + # https://github.com/kubernetes-client/python/issues/2582). Asserting the registration + # directly verifies both-version support regardless of which client CI installed; the + # auth_settings() check then confirms the installed client emits the prefixed header. + credentials = mock.MagicMock() + credentials.token = "the-token" + credentials.expired = expired + credentials.refresh = lambda request: setattr(credentials, "token", "the-token") + conn = GKEClusterConnection( + cluster_url="https://cluster", + ssl_ca_cert=None, + credentials=credentials, + use_dns_endpoint=True, + ) + + config = conn.get_conn().configuration + + assert config.api_key == {"authorization": "the-token", "BearerToken": "the-token"} + assert config.api_key_prefix == {"authorization": "Bearer", "BearerToken": "Bearer"} + assert config.auth_settings()["BearerToken"]["value"] == "Bearer the-token" + + class TestGKEKubernetesAsyncHook: @staticmethod def make_mock_awaitable(mock_obj, result=None):