From 3fd986c99915f1c0f6d2fce0d1c231cca7cfe6fb Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 22 Jun 2026 10:18:53 +0100 Subject: [PATCH 1/4] Make cncf.kubernetes model deserialization picklable in-cluster The kubernetes client (v36) attaches the process-global in-cluster Configuration to every model it deserializes, and that Configuration's refresh_api_key_hook is an unpicklable local closure. Any deserialized model that later gets pickled -- for example a pod_override placed on the KubernetesExecutor multiprocessing queue -- then crashes. Deserialize through an ApiClient built with a fresh Configuration in the provider's model-deserialization paths so the resulting models (and every nested object) stay picklable regardless of the kubernetes client version: PodGenerator.deserialize_model_dict, KubernetesJobOperator.deserialize_job_template_file, and the backcompat _convert_from_dict converter. --- .../backcompat/backwards_compat_converters.py | 7 +++- .../cncf/kubernetes/operators/job.py | 7 +++- .../cncf/kubernetes/pod_generator.py | 9 +++- .../test_backwards_compat_converters.py | 29 +++++++++++++ .../cncf/kubernetes/operators/test_job.py | 42 +++++++++++++++++++ .../cncf/kubernetes/test_pod_generator.py | 33 +++++++++++++++ 6 files changed, 121 insertions(+), 6 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py index 88d16170163e5..6ea2baa42bb7f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py @@ -18,7 +18,7 @@ from __future__ import annotations -from kubernetes.client import ApiClient, models as k8s +from kubernetes.client import ApiClient, Configuration, models as k8s from airflow.providers.common.compat.sdk import AirflowException @@ -36,7 +36,10 @@ def _convert_from_dict(obj, new_class): if isinstance(obj, new_class): return obj if isinstance(obj, dict): - api_client = ApiClient() + # A fresh Configuration() keeps the deserialized model picklable: in-cluster, the kubernetes + # client (v36) would otherwise stamp the global Configuration, whose refresh_api_key_hook is + # an unpicklable local closure, onto the model and every nested object. + api_client = ApiClient(configuration=Configuration()) return api_client._ApiClient__deserialize_model(obj, new_class) raise AirflowException(f"Expected dict or {new_class}, got {type(obj)}") diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py index 0e8540d815ce0..fb0776e27aff5 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py @@ -28,7 +28,7 @@ from functools import cached_property from typing import TYPE_CHECKING, Any, Literal -from kubernetes.client import BatchV1Api, models as k8s +from kubernetes.client import BatchV1Api, Configuration, models as k8s from kubernetes.client.api_client import ApiClient from kubernetes.client.rest import ApiException @@ -378,7 +378,10 @@ def deserialize_job_template_file(path: str) -> k8s.V1Job: job = None log.warning("Template file %s does not exist", path) - api_client = ApiClient() + # A fresh Configuration() keeps the deserialized model picklable: in-cluster, the kubernetes + # client (v36) would otherwise stamp the global Configuration, whose refresh_api_key_hook is + # an unpicklable local closure, onto the model and every nested object. + api_client = ApiClient(configuration=Configuration()) return api_client._ApiClient__deserialize_model(job, k8s.V1Job) def on_kill(self) -> None: diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py index 7e5dc728d775f..ca499d363fc77 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py @@ -34,7 +34,7 @@ from typing import TYPE_CHECKING from dateutil import parser -from kubernetes.client import models as k8s +from kubernetes.client import Configuration, models as k8s from kubernetes.client.api_client import ApiClient from airflow.exceptions import ( @@ -568,10 +568,15 @@ def deserialize_model_dict(pod_dict: dict | None) -> k8s.V1Pod: ``_ApiClient__deserialize_model`` from the kubernetes client. This issue is tracked here; https://github.com/kubernetes-client/python/issues/977. + A fresh ``Configuration`` is passed so that neither the pod nor any nested model captures the + process-global in-cluster ``Configuration``. In-cluster, that global carries a + ``refresh_api_key_hook`` local closure which ``pickle`` cannot serialize, and which would + otherwise break pickling a ``pod_override`` onto the KubernetesExecutor multiprocessing queue. + :param pod_dict: Serialized dict of k8s.V1Pod object :return: De-serialized k8s.V1Pod """ - api_client = ApiClient() + api_client = ApiClient(configuration=Configuration()) return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py index 0c25565821611..ea3d79691e9d2 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py @@ -102,6 +102,35 @@ def test_convert_from_dict_with_invalid_type(): assert str(exc_info.value) == "Expected dict or , got " +def test_convert_from_dict_is_picklable_in_cluster(monkeypatch): + """A model deserialized from a dict must not capture the unpicklable in-cluster Configuration. + + In-cluster, the kubernetes client installs a process-global default ``Configuration`` whose + ``refresh_api_key_hook`` is an unpicklable local closure. ``_convert_from_dict`` must deserialize + through a fresh ``Configuration`` so the model (and every nested object) stays picklable. + """ + import pickle + + from kubernetes.client import Configuration + + def _make_unpicklable_hook(): + def _refresh_api_key(config): + return None + + return _refresh_api_key + + dirty = Configuration() + dirty.refresh_api_key_hook = _make_unpicklable_hook() + monkeypatch.setattr(Configuration, "_default", dirty, raising=False) + + result = _convert_from_dict({"name": "vol", "emptyDir": {}}, k8s.V1Volume) + + assert isinstance(result, k8s.V1Volume) + pickle.dumps(result) + assert result.local_vars_configuration.refresh_api_key_hook is None + assert result.empty_dir.local_vars_configuration.refresh_api_key_hook is None + + # testcase of convert_volume() function @patch("airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters._convert_kube_model_object") def test_convert_volume_normal_value(mock_convert_kube_model_object): diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py index a0f3049cce5a6..a0222c67d078f 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py @@ -200,6 +200,48 @@ def test_backoff_limit_correctly_set(self, clean_dags_dagruns_and_dagbundles): job = k.build_job_request_obj(create_context(k)) assert job.spec.backoff_limit == 6 + def test_deserialize_job_template_file_is_picklable_in_cluster(self, tmp_path, monkeypatch): + """A job deserialized from a template file must not capture the in-cluster Configuration. + + In-cluster, the kubernetes client installs a process-global default ``Configuration`` whose + ``refresh_api_key_hook`` is an unpicklable local closure. ``deserialize_job_template_file`` must + deserialize through a fresh ``Configuration`` so the job (and every nested model) stays picklable. + """ + import pickle + + from kubernetes.client import Configuration + + def _make_unpicklable_hook(): + def _refresh_api_key(config): + return None + + return _refresh_api_key + + dirty = Configuration() + dirty.refresh_api_key_hook = _make_unpicklable_hook() + monkeypatch.setattr(Configuration, "_default", dirty, raising=False) + + template = tmp_path / "job.yaml" + template.write_text( + "apiVersion: batch/v1\n" + "kind: Job\n" + "metadata:\n" + " name: test-job\n" + "spec:\n" + " template:\n" + " spec:\n" + " containers:\n" + " - name: base\n" + " image: airflow:3\n" + ) + + job = KubernetesJobOperator.deserialize_job_template_file(template.as_posix()) + + assert isinstance(job, k8s.V1Job) + pickle.dumps(job) + assert job.local_vars_configuration.refresh_api_key_hook is None + assert job.spec.template.spec.containers[0].local_vars_configuration.refresh_api_key_hook is None + def test_completion_mode_correctly_set(self, clean_dags_dagruns_and_dagbundles): k = KubernetesJobOperator( task_id="task", diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py index db969eec3f796..cd112395d9b6c 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py @@ -698,6 +698,39 @@ def test_deserialize_non_existent_model_file(self, caplog, tmp_path): assert len(caplog.records) == 1 assert "non_existent.yaml does not exist" in caplog.text + def test_deserialize_model_dict_is_picklable_in_cluster(self, monkeypatch): + """A deserialized pod must not capture the unpicklable in-cluster Configuration. + + In-cluster, the kubernetes client installs a process-global default ``Configuration`` whose + ``refresh_api_key_hook`` is an unpicklable local closure. ``deserialize_model_dict`` must + round-trip through a fresh ``Configuration`` so the pod (and every nested model) stays + picklable onto the KubernetesExecutor multiprocessing queue. + """ + import pickle + + from kubernetes.client import Configuration + + def _make_unpicklable_hook(): + def _refresh_api_key(config): + return None + + return _refresh_api_key + + dirty = Configuration() + dirty.refresh_api_key_hook = _make_unpicklable_hook() + monkeypatch.setattr(Configuration, "_default", dirty, raising=False) + + pod_dict = { + "metadata": {"name": "test-pod"}, + "spec": {"containers": [{"name": "base", "image": "airflow:3"}]}, + } + pod = PodGenerator.deserialize_model_dict(pod_dict) + + assert isinstance(pod, k8s.V1Pod) + pickle.dumps(pod) + assert pod.local_vars_configuration.refresh_api_key_hook is None + assert pod.spec.containers[0].local_vars_configuration.refresh_api_key_hook is None + @pytest.mark.parametrize( "input", ( From f87f0baa536c8e1688723b78f89e6de2aab04c66 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 23 Jun 2026 08:55:43 +0100 Subject: [PATCH 2/4] fixup! Make cncf.kubernetes model deserialization picklable in-cluster --- .../cncf/kubernetes/backcompat/backwards_compat_converters.py | 4 +--- .../src/airflow/providers/cncf/kubernetes/operators/job.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py index 6ea2baa42bb7f..409496efe14b9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py @@ -36,9 +36,7 @@ def _convert_from_dict(obj, new_class): if isinstance(obj, new_class): return obj if isinstance(obj, dict): - # A fresh Configuration() keeps the deserialized model picklable: in-cluster, the kubernetes - # client (v36) would otherwise stamp the global Configuration, whose refresh_api_key_hook is - # an unpicklable local closure, onto the model and every nested object. + # A fresh Configuration() keeps the deserialized model picklabled api_client = ApiClient(configuration=Configuration()) return api_client._ApiClient__deserialize_model(obj, new_class) raise AirflowException(f"Expected dict or {new_class}, got {type(obj)}") diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py index fb0776e27aff5..e7005e1bd5705 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py @@ -378,9 +378,7 @@ def deserialize_job_template_file(path: str) -> k8s.V1Job: job = None log.warning("Template file %s does not exist", path) - # A fresh Configuration() keeps the deserialized model picklable: in-cluster, the kubernetes - # client (v36) would otherwise stamp the global Configuration, whose refresh_api_key_hook is - # an unpicklable local closure, onto the model and every nested object. + # A fresh Configuration() keeps the deserialized model picklable api_client = ApiClient(configuration=Configuration()) return api_client._ApiClient__deserialize_model(job, k8s.V1Job) From 72baa87325b11f324a998f521ac799689d3387c9 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 23 Jun 2026 09:03:43 +0100 Subject: [PATCH 3/4] move imports to top level --- .../backcompat/test_backwards_compat_converters.py | 6 ++---- .../tests/unit/cncf/kubernetes/operators/test_job.py | 6 ++---- .../tests/unit/cncf/kubernetes/test_pod_generator.py | 6 ++---- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py index ea3d79691e9d2..0f11e9e1d2ed6 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py @@ -16,10 +16,11 @@ # under the License. from __future__ import annotations +import pickle from unittest.mock import Mock, patch import pytest -from kubernetes.client import models as k8s +from kubernetes.client import Configuration, models as k8s from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters import ( _convert_from_dict, @@ -109,9 +110,6 @@ def test_convert_from_dict_is_picklable_in_cluster(monkeypatch): ``refresh_api_key_hook`` is an unpicklable local closure. ``_convert_from_dict`` must deserialize through a fresh ``Configuration`` so the model (and every nested object) stays picklable. """ - import pickle - - from kubernetes.client import Configuration def _make_unpicklable_hook(): def _refresh_api_key(config): diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py index a0222c67d078f..754560e1869c2 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import pickle import random import re import string @@ -24,7 +25,7 @@ import pendulum import pytest -from kubernetes.client import ApiClient, models as k8s +from kubernetes.client import ApiClient, Configuration, models as k8s from kubernetes.client.rest import ApiException from airflow.exceptions import AirflowProviderDeprecationWarning @@ -207,9 +208,6 @@ def test_deserialize_job_template_file_is_picklable_in_cluster(self, tmp_path, m ``refresh_api_key_hook`` is an unpicklable local closure. ``deserialize_job_template_file`` must deserialize through a fresh ``Configuration`` so the job (and every nested model) stays picklable. """ - import pickle - - from kubernetes.client import Configuration def _make_unpicklable_hook(): def _refresh_api_key(config): diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py index cd112395d9b6c..f730761c732ab 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import pickle import re from unittest import mock from unittest.mock import MagicMock @@ -23,7 +24,7 @@ import pendulum import pytest from dateutil import parser -from kubernetes.client import ApiClient, models as k8s +from kubernetes.client import ApiClient, Configuration, models as k8s from airflow import __version__ from airflow.exceptions import AirflowConfigException @@ -706,9 +707,6 @@ def test_deserialize_model_dict_is_picklable_in_cluster(self, monkeypatch): round-trip through a fresh ``Configuration`` so the pod (and every nested model) stays picklable onto the KubernetesExecutor multiprocessing queue. """ - import pickle - - from kubernetes.client import Configuration def _make_unpicklable_hook(): def _refresh_api_key(config): From 2a2559bae1de9ec3ce2ca861a54246ece78ac9cd Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 23 Jun 2026 13:55:55 +0100 Subject: [PATCH 4/4] Address review feedback on provider deserialization fix Keep the picklability rationale only in PodGenerator.deserialize_model_dict (drop the duplicated comments at the job operator and converter call sites), and replace the unnecessary closure-returning-closure test helper with a directly-defined local hook (already unpicklable on its own). --- .../kubernetes/backcompat/backwards_compat_converters.py | 1 - .../airflow/providers/cncf/kubernetes/operators/job.py | 1 - .../backcompat/test_backwards_compat_converters.py | 9 +++------ .../tests/unit/cncf/kubernetes/operators/test_job.py | 9 +++------ .../tests/unit/cncf/kubernetes/test_pod_generator.py | 9 +++------ 5 files changed, 9 insertions(+), 20 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py index 409496efe14b9..0ca600b4466c9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py @@ -36,7 +36,6 @@ def _convert_from_dict(obj, new_class): if isinstance(obj, new_class): return obj if isinstance(obj, dict): - # A fresh Configuration() keeps the deserialized model picklabled api_client = ApiClient(configuration=Configuration()) return api_client._ApiClient__deserialize_model(obj, new_class) raise AirflowException(f"Expected dict or {new_class}, got {type(obj)}") diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py index e7005e1bd5705..e888159c73923 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py @@ -378,7 +378,6 @@ def deserialize_job_template_file(path: str) -> k8s.V1Job: job = None log.warning("Template file %s does not exist", path) - # A fresh Configuration() keeps the deserialized model picklable api_client = ApiClient(configuration=Configuration()) return api_client._ApiClient__deserialize_model(job, k8s.V1Job) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py index 0f11e9e1d2ed6..f3a9dc33a850a 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py @@ -111,14 +111,11 @@ def test_convert_from_dict_is_picklable_in_cluster(monkeypatch): through a fresh ``Configuration`` so the model (and every nested object) stays picklable. """ - def _make_unpicklable_hook(): - def _refresh_api_key(config): - return None - - return _refresh_api_key + def _refresh_api_key(config): + return None dirty = Configuration() - dirty.refresh_api_key_hook = _make_unpicklable_hook() + dirty.refresh_api_key_hook = _refresh_api_key monkeypatch.setattr(Configuration, "_default", dirty, raising=False) result = _convert_from_dict({"name": "vol", "emptyDir": {}}, k8s.V1Volume) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py index 754560e1869c2..d99e3e294fb36 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py @@ -209,14 +209,11 @@ def test_deserialize_job_template_file_is_picklable_in_cluster(self, tmp_path, m deserialize through a fresh ``Configuration`` so the job (and every nested model) stays picklable. """ - def _make_unpicklable_hook(): - def _refresh_api_key(config): - return None - - return _refresh_api_key + def _refresh_api_key(config): + return None dirty = Configuration() - dirty.refresh_api_key_hook = _make_unpicklable_hook() + dirty.refresh_api_key_hook = _refresh_api_key monkeypatch.setattr(Configuration, "_default", dirty, raising=False) template = tmp_path / "job.yaml" diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py index f730761c732ab..cfeea2d6163bc 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py @@ -708,14 +708,11 @@ def test_deserialize_model_dict_is_picklable_in_cluster(self, monkeypatch): picklable onto the KubernetesExecutor multiprocessing queue. """ - def _make_unpicklable_hook(): - def _refresh_api_key(config): - return None - - return _refresh_api_key + def _refresh_api_key(config): + return None dirty = Configuration() - dirty.refresh_api_key_hook = _make_unpicklable_hook() + dirty.refresh_api_key_hook = _refresh_api_key monkeypatch.setattr(Configuration, "_default", dirty, raising=False) pod_dict = {