From 8cc705e512788cdbaa11fe197e4ed5bb21781ff6 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Fri, 26 Jun 2026 13:50:51 +0300 Subject: [PATCH] Fix GKE provider 401 on kubernetes client 36.x kubernetes-client 36.x renamed the bearer-token auth key from 'authorization' to 'BearerToken'. The GKE connection hand-builds a Configuration and registered the token/prefix only under 'authorization', so on 36.x the 'Bearer' prefix is dropped and requests go out with the raw token, which the cluster rejects with 401. Register the token and prefix under both keys so authentication works on client 35.x and 36.x alike, which lets the provider support 36.x without downgrading the kubernetes client (and reintroducing the NO_PROXY issue 36.x fixed). --- .../google/cloud/hooks/kubernetes_engine.py | 13 ++++++--- .../cloud/hooks/test_kubernetes_engine.py | 27 +++++++++++++++++++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py b/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py index 1a935438d102e..a721578132edb 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py @@ -80,13 +80,20 @@ def get_conn(self) -> client.ApiClient: return client.ApiClient(configuration) def _refresh_api_key_hook(self, configuration: client.configuration.Configuration): - configuration.api_key = {"authorization": self._get_token(self._credentials)} + configuration.api_key = self._bearer_api_key(self._get_token(self._credentials)) + + @staticmethod + def _bearer_api_key(token: str) -> dict[str, str]: + # kubernetes-client 36.x renamed the bearer auth key 'authorization' -> 'BearerToken' + # (https://github.com/kubernetes-client/python/issues/2582). Register both so the 'Bearer' + # prefix is applied on client 35.x and 36.x alike; otherwise 36.x sends the raw token -> 401. + return {"authorization": token, "BearerToken": token} def _get_config(self) -> client.configuration.Configuration: configuration = client.Configuration( host=self._cluster_url, - api_key_prefix={"authorization": "Bearer"}, - api_key={"authorization": self._get_token(self._credentials)}, + api_key_prefix={"authorization": "Bearer", "BearerToken": "Bearer"}, + api_key=self._bearer_api_key(self._get_token(self._credentials)), ) if not self.use_dns_endpoint: configuration.ssl_ca_cert = FileOrData( diff --git a/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py b/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py index 3121ee1b7da56..f3a1a41620416 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py @@ -31,6 +31,7 @@ from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.google.cloud.hooks.kubernetes_engine import ( GKEAsyncHook, + GKEClusterConnection, GKEHook, GKEKubernetesAsyncHook, GKEKubernetesHook, @@ -486,6 +487,32 @@ def test_apply_from_yaml_file(self, mock_get_conn, mock_super, api_client, expec mock_super.return_value.apply_from_yaml_file.assert_called_once_with(**expected_kwargs) +class TestGKEClusterConnection: + @pytest.mark.parametrize("expired", [False, True]) + def test_registers_bearer_token_for_client_35_and_36(self, expired): + # The bearer token and its "Bearer" prefix must be registered under both the client-35.x key + # ('authorization') and the client-36.x key ('BearerToken', renamed in + # https://github.com/kubernetes-client/python/issues/2582). Asserting the registration + # directly verifies both-version support regardless of which client CI installed; the + # auth_settings() check then confirms the installed client emits the prefixed header. + credentials = mock.MagicMock() + credentials.token = "the-token" + credentials.expired = expired + credentials.refresh = lambda request: setattr(credentials, "token", "the-token") + conn = GKEClusterConnection( + cluster_url="https://cluster", + ssl_ca_cert=None, + credentials=credentials, + use_dns_endpoint=True, + ) + + config = conn.get_conn().configuration + + assert config.api_key == {"authorization": "the-token", "BearerToken": "the-token"} + assert config.api_key_prefix == {"authorization": "Bearer", "BearerToken": "Bearer"} + assert config.auth_settings()["BearerToken"]["value"] == "Bearer the-token" + + class TestGKEKubernetesAsyncHook: @staticmethod def make_mock_awaitable(mock_obj, result=None):