diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 835d239f209f2..ec7d339f37769 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -174,8 +174,15 @@ async def await_pod_start( else: remote_pod = pod_manager.read_pod(pod) pod_status = remote_pod.status + + if pod_status.phase == PodPhase.FAILED and pod_status.container_statuses is None: + pod_manager.stop_watching_events = True + pod_manager.log.info("::endgroup::") + raise PodLaunchFailedException("Pod failed before containers started") + if pod_status.phase not in (PodPhase.PENDING, PodPhase.UNKNOWN): pod_manager.stop_watching_events = True + pod_manager.log.info("Pod has reached %s phase before launch timeout", pod_status.phase) pod_manager.log.info("::endgroup::") break diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index e2390f90dc3a9..2f5245217d91f 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -1044,6 +1044,28 @@ def pod_state_gen(): ) mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) + @pytest.mark.asyncio + async def test_start_pod_preemption_raises_error(self): + """After a pod is scheduled on a node, it is possible that it gets preempted by another pod, such as a daemonset on a new node, it is possible this happens before + any containers are created. In that case airflow needs to recreate the pod. + """ + + pod_response = mock.MagicMock() + pod_response.status.phase = "Failed" + pod_response.status.container_statuses = None + pod_response.status.message = "Pod was rejected: Node didn't have enough resource: memory, requested: 547356672, used: 14813233152, capacity: 15334334464" + pod_response.status.reason = "OutOfmemory" + + self.mock_kube_client.read_namespaced_pod.return_value = pod_response + expected_msg = "Pod failed before containers started" + mock_pod = MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + await self.pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=60, + startup_timeout=60, + ) + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running") def test_container_is_running(self, container_is_running_mock): mock_pod = MagicMock()