Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,15 @@ async def await_pod_start(
else:
remote_pod = pod_manager.read_pod(pod)
pod_status = remote_pod.status

if pod_status.phase == PodPhase.FAILED and pod_status.container_statuses is None:
pod_manager.stop_watching_events = True
pod_manager.log.info("::endgroup::")
raise PodLaunchFailedException("Pod failed before containers started")

if pod_status.phase not in (PodPhase.PENDING, PodPhase.UNKNOWN):
pod_manager.stop_watching_events = True
pod_manager.log.info("Pod has reached %s phase before launch timeout", pod_status.phase)
pod_manager.log.info("::endgroup::")
break

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,28 @@ def pod_state_gen():
)
mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout)

@pytest.mark.asyncio
async def test_start_pod_preemption_raises_error(self):
"""After a pod is scheduled on a node, it is possible that it gets preempted by another pod, such as a daemonset on a new node, it is possible this happens before
any containers are created. In that case airflow needs to recreate the pod.
"""

pod_response = mock.MagicMock()
pod_response.status.phase = "Failed"
pod_response.status.container_statuses = None
pod_response.status.message = "Pod was rejected: Node didn't have enough resource: memory, requested: 547356672, used: 14813233152, capacity: 15334334464"
pod_response.status.reason = "OutOfmemory"

self.mock_kube_client.read_namespaced_pod.return_value = pod_response
expected_msg = "Pod failed before containers started"
mock_pod = MagicMock()
with pytest.raises(AirflowException, match=expected_msg):
await self.pod_manager.await_pod_start(
pod=mock_pod,
schedule_timeout=60,
startup_timeout=60,
)

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
def test_container_is_running(self, container_is_running_mock):
mock_pod = MagicMock()
Expand Down
Loading