diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 0dcc01537cb78..6ca463ed422d0 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -300,8 +300,20 @@ def sync(self) -> None: finally: self.result_queue.task_done() - for result in self.completed: - self._change_state(result) + failed = set() + while self.completed: + result = self.completed.pop() + try: + self._change_state(result) + except Exception as e: + self.log.exception( + "Exception: %s when attempting to change state of %s to %s, re-queueing.", + e, + result, + result.state, + ) + failed.add(result) + self.completed = failed from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ResourceVersion diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index bc1c2a97f55c7..302f6035c5528 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1507,6 +1507,83 @@ def test_adopt_completed_pods_single_scheduler_unchanged( header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) + @pytest.fixture + def completed_result(self): + """Factory for a KubernetesResults entry as ``_adopt_completed_pods`` would add to ``self.completed``.""" + + def _make(name): + return KubernetesResults( + key=TaskInstanceKey("dag", name, "run_id", 1, -1), + state="completed", + pod_name=name, + namespace="somens", + resource_version="0", + failure_details=None, + ) + + return _make + + @pytest.fixture + def sync_ready_executor(self): + """An executor whose ``sync()`` exercises only the ``self.completed`` drain. + + Both queues are mocked so their ``get_nowait`` raises ``Empty`` immediately (as an + empty queue would), and ``kube_scheduler`` is mocked, so the only behavior under test + is how ``sync()`` processes ``self.completed``. + """ + from queue import Empty + + executor = self.kubernetes_executor + executor.scheduler_job_id = "modified" + executor.kube_client = mock.MagicMock() + executor.kube_scheduler = mock.MagicMock() + executor.result_queue = mock.MagicMock() + executor.result_queue.get_nowait.side_effect = Empty + executor.task_queue = mock.MagicMock() + executor.task_queue.get_nowait.side_effect = Empty + executor.create_pods_after = None + executor.kube_config.worker_pods_creation_batch_size = 1 + return executor + + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state" + ) + def test_sync_drains_and_clears_completed(self, mock_change_state, sync_ready_executor, completed_result): + """``sync()`` processes every entry in ``self.completed`` exactly once and clears the set.""" + executor = sync_ready_executor + results = {completed_result(name) for name in ("one", "two", "three")} + executor.completed = set(results) + + executor.sync() + + assert {call.args[0] for call in mock_change_state.call_args_list} == results + assert mock_change_state.call_count == len(results) + assert executor.completed == set() + + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state" + ) + def test_sync_retains_completed_entry_on_change_state_failure( + self, mock_change_state, sync_ready_executor, completed_result + ): + """A ``self.completed`` entry whose ``_change_state`` raises is retained; the rest still drain.""" + executor = sync_ready_executor + good_one = completed_result("good-one") + good_two = completed_result("good-two") + bad = completed_result("bad") + executor.completed = {good_one, good_two, bad} + + def fail_only_bad(result): + if result is bad: + raise RuntimeError("boom") + + mock_change_state.side_effect = fail_only_bad + + executor.sync() + + assert mock_change_state.call_count == 3 + assert executor.completed == {bad} + @pytest.mark.db_test def test_alive_other_scheduler_job_ids_does_not_detach_caller_session(self, session): """``_alive_other_scheduler_job_ids`` must use an independent (non-scoped) session.