diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 10aef9b5328ad..8707bf96990c4 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -220,10 +220,13 @@ def process_status( resource_version: str, event: Any, ) -> None: + pod = event["object"] annotations_string = annotations_for_logging_task_metadata(annotations) """Process status response.""" if status == "Pending": - if event["type"] == "DELETED": + # deletion_timestamp is set by kube server when a graceful deletion is requested. + # since kube server have received request to delete pod set TI state failed + if event["type"] == "DELETED" and pod.metadata.deletion_timestamp: self.log.info("Event: Failed to start pod %s, annotations: %s", pod_name, annotations_string) self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version)) else: @@ -237,7 +240,6 @@ def process_status( # If our event type is DELETED, we have the POD_EXECUTOR_DONE_KEY, or the pod has # a deletion timestamp, we've already seen the initial Succeeded event and sent it # along to the scheduler. - pod = event["object"] if ( event["type"] == "DELETED" or POD_EXECUTOR_DONE_KEY in pod.metadata.labels @@ -251,7 +253,9 @@ def process_status( self.log.info("Event: %s Succeeded, annotations: %s", pod_name, annotations_string) self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version)) elif status == "Running": - if event["type"] == "DELETED": + # deletion_timestamp is set by kube server when a graceful deletion is requested. + # since kube server have received request to delete pod set TI state failed + if event["type"] == "DELETED" and pod.metadata.deletion_timestamp: self.log.info( "Event: Pod %s deleted before it could complete, annotations: %s", pod_name, diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index fc9daaacde2ca..ba9c33a36e421 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -1261,6 +1261,7 @@ def test_process_status_pending(self): def test_process_status_pending_deleted(self): self.events.append({"type": "DELETED", "object": self.pod}) + self.pod.metadata.deletion_timestamp = datetime.utcnow() self._run() self.assert_watcher_queue_called_once_with_state(State.FAILED) @@ -1305,6 +1306,7 @@ def test_process_status_succeeded_type_delete(self): def test_process_status_running_deleted(self): self.pod.status.phase = "Running" + self.pod.metadata.deletion_timestamp = datetime.utcnow() self.events.append({"type": "DELETED", "object": self.pod}) self._run()