From 95c4c7091624f958d096eae792a4171573522a9f Mon Sep 17 00:00:00 2001 From: Stephen Tyndall Date: Wed, 10 Jun 2026 16:55:35 -0400 Subject: [PATCH 1/4] Avoid duplicate completed deletions by draining completed set, moving completed updates out of result_queue loop --- .../kubernetes/executors/kubernetes_executor.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 0dcc01537cb78..6ca463ed422d0 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -300,8 +300,20 @@ def sync(self) -> None: finally: self.result_queue.task_done() - for result in self.completed: - self._change_state(result) + failed = set() + while self.completed: + result = self.completed.pop() + try: + self._change_state(result) + except Exception as e: + self.log.exception( + "Exception: %s when attempting to change state of %s to %s, re-queueing.", + e, + result, + result.state, + ) + failed.add(result) + self.completed = failed from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ResourceVersion From abfd64a3c518a5e8fdc13c8c47081fa987ae0198 Mon Sep 17 00:00:00 2001 From: Stephen Tyndall Date: Wed, 10 Jun 2026 16:56:12 -0400 Subject: [PATCH 2/4] add tests to verify completed gets drained, requeues of failed state updates --- .../executors/test_kubernetes_executor.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index bc1c2a97f55c7..f63762cdc543d 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1507,6 +1507,72 @@ def test_adopt_completed_pods_single_scheduler_unchanged( header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) + @staticmethod + def _completed_result(name): + """Build a KubernetesResults entry as ``_adopt_completed_pods`` would add to ``self.completed``.""" + return KubernetesResults( + key=TaskInstanceKey("dag", name, "run_id", 1, -1), + state="completed", + pod_name=name, + namespace="somens", + resource_version="0", + failure_details=None, + ) + + def _make_sync_ready_executor(self): + """An executor whose ``sync()`` exercises only the ``self.completed`` drain. + + Both queues are mocked so their ``get_nowait`` raises ``Empty`` immediately (as an + empty queue would), and ``kube_scheduler`` is mocked, so the only behavior under test + is how ``sync()`` processes ``self.completed``. + """ + from queue import Empty + + executor = self.kubernetes_executor + executor.scheduler_job_id = "modified" + executor.kube_client = mock.MagicMock() + executor.kube_scheduler = mock.MagicMock() + executor.result_queue = mock.MagicMock() + executor.result_queue.get_nowait.side_effect = Empty + executor.task_queue = mock.MagicMock() + executor.task_queue.get_nowait.side_effect = Empty + executor.create_pods_after = None + executor.kube_config.worker_pods_creation_batch_size = 1 + return executor + + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state") + def test_sync_drains_and_clears_completed(self, mock_change_state): + """``sync()`` processes every entry in ``self.completed`` exactly once and clears the set.""" + executor = self._make_sync_ready_executor() + results = {self._completed_result(name) for name in ("one", "two", "three")} + executor.completed = set(results) + + executor.sync() + + assert {call.args[0] for call in mock_change_state.call_args_list} == results + assert mock_change_state.call_count == len(results) + assert executor.completed == set() + + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state") + def test_sync_retains_completed_entry_on_change_state_failure(self, mock_change_state): + """A ``self.completed`` entry whose ``_change_state`` raises is retained; the rest still drain.""" + executor = self._make_sync_ready_executor() + good_one = self._completed_result("good-one") + good_two = self._completed_result("good-two") + bad = self._completed_result("bad") + executor.completed = {good_one, good_two, bad} + + def fail_only_bad(result): + if result is bad: + raise RuntimeError("boom") + + mock_change_state.side_effect = fail_only_bad + + executor.sync() + + assert mock_change_state.call_count == 3 + assert executor.completed == {bad} + @pytest.mark.db_test def test_alive_other_scheduler_job_ids_does_not_detach_caller_session(self, session): """``_alive_other_scheduler_job_ids`` must use an independent (non-scoped) session. From 90e3350ab4eab7b6612245d7be882f0ca2b0b9ea Mon Sep 17 00:00:00 2001 From: Stephen Tyndall Date: Fri, 12 Jun 2026 09:20:08 -0400 Subject: [PATCH 3/4] lint --- .../cncf/kubernetes/executors/test_kubernetes_executor.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index f63762cdc543d..a7814a25ef5d5 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1540,7 +1540,9 @@ def _make_sync_ready_executor(self): executor.kube_config.worker_pods_creation_batch_size = 1 return executor - @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state") + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state" + ) def test_sync_drains_and_clears_completed(self, mock_change_state): """``sync()`` processes every entry in ``self.completed`` exactly once and clears the set.""" executor = self._make_sync_ready_executor() @@ -1553,7 +1555,9 @@ def test_sync_drains_and_clears_completed(self, mock_change_state): assert mock_change_state.call_count == len(results) assert executor.completed == set() - @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state") + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state" + ) def test_sync_retains_completed_entry_on_change_state_failure(self, mock_change_state): """A ``self.completed`` entry whose ``_change_state`` raises is retained; the rest still drain.""" executor = self._make_sync_ready_executor() From e9ebbb57ad0ac589b57e6263120fc9e4e7400e36 Mon Sep 17 00:00:00 2001 From: Stephen Tyndall Date: Tue, 16 Jun 2026 16:11:14 -0400 Subject: [PATCH 4/4] turn test helper functions into pytest fixtures --- .../executors/test_kubernetes_executor.py | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index a7814a25ef5d5..302f6035c5528 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1507,19 +1507,24 @@ def test_adopt_completed_pods_single_scheduler_unchanged( header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) - @staticmethod - def _completed_result(name): - """Build a KubernetesResults entry as ``_adopt_completed_pods`` would add to ``self.completed``.""" - return KubernetesResults( - key=TaskInstanceKey("dag", name, "run_id", 1, -1), - state="completed", - pod_name=name, - namespace="somens", - resource_version="0", - failure_details=None, - ) + @pytest.fixture + def completed_result(self): + """Factory for a KubernetesResults entry as ``_adopt_completed_pods`` would add to ``self.completed``.""" + + def _make(name): + return KubernetesResults( + key=TaskInstanceKey("dag", name, "run_id", 1, -1), + state="completed", + pod_name=name, + namespace="somens", + resource_version="0", + failure_details=None, + ) + + return _make - def _make_sync_ready_executor(self): + @pytest.fixture + def sync_ready_executor(self): """An executor whose ``sync()`` exercises only the ``self.completed`` drain. Both queues are mocked so their ``get_nowait`` raises ``Empty`` immediately (as an @@ -1543,10 +1548,10 @@ def _make_sync_ready_executor(self): @mock.patch( "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state" ) - def test_sync_drains_and_clears_completed(self, mock_change_state): + def test_sync_drains_and_clears_completed(self, mock_change_state, sync_ready_executor, completed_result): """``sync()`` processes every entry in ``self.completed`` exactly once and clears the set.""" - executor = self._make_sync_ready_executor() - results = {self._completed_result(name) for name in ("one", "two", "three")} + executor = sync_ready_executor + results = {completed_result(name) for name in ("one", "two", "three")} executor.completed = set(results) executor.sync() @@ -1558,12 +1563,14 @@ def test_sync_drains_and_clears_completed(self, mock_change_state): @mock.patch( "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state" ) - def test_sync_retains_completed_entry_on_change_state_failure(self, mock_change_state): + def test_sync_retains_completed_entry_on_change_state_failure( + self, mock_change_state, sync_ready_executor, completed_result + ): """A ``self.completed`` entry whose ``_change_state`` raises is retained; the rest still drain.""" - executor = self._make_sync_ready_executor() - good_one = self._completed_result("good-one") - good_two = self._completed_result("good-two") - bad = self._completed_result("bad") + executor = sync_ready_executor + good_one = completed_result("good-one") + good_two = completed_result("good-two") + bad = completed_result("bad") executor.completed = {good_one, good_two, bad} def fail_only_bad(result):