From 3df6127657474e357011ee1920370ee92ab54fbe Mon Sep 17 00:00:00 2001 From: Ihor Lukianov Date: Wed, 17 Jun 2026 13:12:52 +0200 Subject: [PATCH 1/3] Fix Kubernetes Executor pods deletion storm --- .../executors/kubernetes_executor.py | 16 +++++++- .../executors/test_kubernetes_executor.py | 38 +++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 0dcc01537cb78..c213f6a742c1d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -300,8 +300,18 @@ def sync(self) -> None: finally: self.result_queue.task_done() - for result in self.completed: + if self.completed: + still_pending: set[KubernetesResults] = set() + for result in self.completed: + try: self._change_state(result) + except Exception: + self.log.exception( + "Exception when attempting to change state of adopted completed pod %s, will retry.", + result, + ) + still_pending.add(result) + self.completed = still_pending from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ResourceVersion @@ -813,11 +823,13 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: continue ti_id = annotations_to_key(pod.metadata.annotations) + pod_name = pod.metadata.name + self.completed = {result for result in self.completed if result.pod_name != pod_name} self.completed.add( KubernetesResults( key=ti_id, state="completed", - pod_name=pod.metadata.name, + pod_name=pod_name, namespace=pod.metadata.namespace, resource_version=pod.metadata.resource_version, failure_details=None, diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index bc1c2a97f55c7..901b9644d9294 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1536,6 +1536,44 @@ def test_alive_other_scheduler_job_ids_does_not_detach_caller_session(self, sess "_alive_other_scheduler_job_ids closed/detached the caller's scoped session" ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod" + ) + def test_sync_processes_completed_pods_once( + self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher + ): + """Adopted completed pods must not be re-deleted for every result-queue item.""" + executor = self.kubernetes_executor + executor.start() + try: + completed_key = TaskInstanceKey(dag_id="dag", task_id="completed", run_id="run_id", try_number=1) + queue_key = TaskInstanceKey(dag_id="dag", task_id="queued", run_id="run_id", try_number=1) + executor.completed = { + KubernetesResults( + completed_key, + "completed", + "completed-pod", + "default", + "1", + None, + ) + } + executor.result_queue.put( + KubernetesResults(queue_key, None, "queue-pod", "default", "2", None) + ) + executor.result_queue.put( + KubernetesResults(queue_key, None, "queue-pod-2", "default", "3", None) + ) + + executor.sync() + + assert mock_delete_pod.call_count == 3 + assert executor.completed == set() + finally: + executor.end() + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") def test_not_adopt_unassigned_task(self, mock_kube_client): """ From a61ec7addd5b8a4b9f741d35c861255c1d79db99 Mon Sep 17 00:00:00 2001 From: Ihor Lukianov Date: Thu, 18 Jun 2026 12:34:29 +0200 Subject: [PATCH 2/3] Used dict for better performance; Added UT for delete_worker_pods=False --- .../executors/kubernetes_executor.py | 26 ++++----- .../executors/test_kubernetes_executor.py | 58 ++++++++++++++++++- 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index c213f6a742c1d..03362c3c1687d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -114,7 +114,7 @@ def __init__(self, *args, **kwargs): self.task_publish_max_retries = self.conf.getint( "kubernetes_executor", "task_publish_max_retries", fallback=0 ) - self.completed: set[KubernetesResults] = set() + self.completed: dict[tuple[str, str], KubernetesResults] = {} self.create_pods_after: datetime | None = None def _list_pods(self, query_kwargs): @@ -301,8 +301,8 @@ def sync(self) -> None: self.result_queue.task_done() if self.completed: - still_pending: set[KubernetesResults] = set() - for result in self.completed: + still_pending: dict[tuple[str, str], KubernetesResults] = {} + for pod_key, result in self.completed.items(): try: self._change_state(result) except Exception: @@ -310,7 +310,7 @@ def sync(self) -> None: "Exception when attempting to change state of adopted completed pod %s, will retry.", result, ) - still_pending.add(result) + still_pending[pod_key] = result self.completed = still_pending from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ResourceVersion @@ -824,16 +824,14 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: ti_id = annotations_to_key(pod.metadata.annotations) pod_name = pod.metadata.name - self.completed = {result for result in self.completed if result.pod_name != pod_name} - self.completed.add( - KubernetesResults( - key=ti_id, - state="completed", - pod_name=pod_name, - namespace=pod.metadata.namespace, - resource_version=pod.metadata.resource_version, - failure_details=None, - ) + namespace = pod.metadata.namespace + self.completed[(namespace, pod_name)] = KubernetesResults( + key=ti_id, + state="completed", + pod_name=pod_name, + namespace=namespace, + resource_version=pod.metadata.resource_version, + failure_details=None, ) def _flush_task_queue(self) -> None: diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 901b9644d9294..8c7aad5dd6b92 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1397,7 +1397,7 @@ def get_annotations(pod_name): ], any_order=True, ) - assert {k8s_res.key for k8s_res in executor.completed} == expected_running_ti_keys + assert {k8s_res.key for k8s_res in executor.completed.values()} == expected_running_ti_keys @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") @@ -1536,6 +1536,7 @@ def test_alive_other_scheduler_job_ids_does_not_detach_caller_session(self, sess "_alive_other_scheduler_job_ids closed/detached the caller's scoped session" ) + @pytest.mark.db_test @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") @mock.patch( @@ -1551,7 +1552,7 @@ def test_sync_processes_completed_pods_once( completed_key = TaskInstanceKey(dag_id="dag", task_id="completed", run_id="run_id", try_number=1) queue_key = TaskInstanceKey(dag_id="dag", task_id="queued", run_id="run_id", try_number=1) executor.completed = { - KubernetesResults( + ("default", "completed-pod"): KubernetesResults( completed_key, "completed", "completed-pod", @@ -1570,7 +1571,58 @@ def test_sync_processes_completed_pods_once( executor.sync() assert mock_delete_pod.call_count == 3 - assert executor.completed == set() + assert executor.completed == {} + finally: + executor.end() + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler" + ) + def test_sync_processes_completed_pods_once_without_deletion( + self, mock_kubescheduler, mock_get_kube_client, mock_kubernetes_job_watcher + ): + """Adopted completed pods must not be re-patched for every result-queue item.""" + mock_delete_pod = mock_kubescheduler.return_value.delete_pod + mock_patch_pod = mock_kubescheduler.return_value.patch_pod_executor_done + executor = self.kubernetes_executor + executor.kube_config.delete_worker_pods = False + executor.start() + try: + completed_key = TaskInstanceKey(dag_id="dag", task_id="completed", run_id="run_id", try_number=1) + queue_key = TaskInstanceKey(dag_id="dag", task_id="queued", run_id="run_id", try_number=1) + executor.completed = { + ("default", "completed-pod"): KubernetesResults( + completed_key, + "completed", + "completed-pod", + "default", + "1", + None, + ) + } + executor.result_queue.put( + KubernetesResults(queue_key, None, "queue-pod", "default", "2", None) + ) + executor.result_queue.put( + KubernetesResults(queue_key, None, "queue-pod-2", "default", "3", None) + ) + + executor.sync() + + mock_delete_pod.assert_not_called() + assert mock_patch_pod.call_count == 3 + mock_patch_pod.assert_has_calls( + [ + mock.call(pod_name="completed-pod", namespace="default"), + mock.call(pod_name="queue-pod", namespace="default"), + mock.call(pod_name="queue-pod-2", namespace="default"), + ], + any_order=True, + ) + assert executor.completed == {} finally: executor.end() From 7f94c04d5f21e8d7daf775e6974347974c66df5e Mon Sep 17 00:00:00 2001 From: Ihor Lukianov Date: Thu, 18 Jun 2026 15:18:44 +0200 Subject: [PATCH 3/3] Fix formatting --- .../executors/test_kubernetes_executor.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 8c7aad5dd6b92..c82b97af85978 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1561,12 +1561,8 @@ def test_sync_processes_completed_pods_once( None, ) } - executor.result_queue.put( - KubernetesResults(queue_key, None, "queue-pod", "default", "2", None) - ) - executor.result_queue.put( - KubernetesResults(queue_key, None, "queue-pod-2", "default", "3", None) - ) + executor.result_queue.put(KubernetesResults(queue_key, None, "queue-pod", "default", "2", None)) + executor.result_queue.put(KubernetesResults(queue_key, None, "queue-pod-2", "default", "3", None)) executor.sync() @@ -1603,12 +1599,8 @@ def test_sync_processes_completed_pods_once_without_deletion( None, ) } - executor.result_queue.put( - KubernetesResults(queue_key, None, "queue-pod", "default", "2", None) - ) - executor.result_queue.put( - KubernetesResults(queue_key, None, "queue-pod-2", "default", "3", None) - ) + executor.result_queue.put(KubernetesResults(queue_key, None, "queue-pod", "default", "2", None)) + executor.result_queue.put(KubernetesResults(queue_key, None, "queue-pod-2", "default", "3", None)) executor.sync()