From 874886dea3a9867b731967aaee5ed18c86dd710e Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sun, 14 May 2023 02:30:48 +0530 Subject: [PATCH 1/2] Fix Kubernetes executor set wrong task status In the case of multiple schedulers and lots of tasks running If somehow schedulers restart and try to adopt pods in some cases, it sets the wrong task status. In this PR, I'm changing some checks so that if the pod status is non-terminal then set the task status Failed only if the pod event type is DELETED and POD_EXECUTOR_DONE_KEY is in the pod label --- airflow/executors/kubernetes_executor.py | 9 +++++++-- tests/executors/test_kubernetes_executor.py | 2 ++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 10aef9b5328ad..24b988c6512bf 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -220,10 +220,13 @@ def process_status( resource_version: str, event: Any, ) -> None: + kube_pod = event["object"] annotations_string = annotations_for_logging_task_metadata(annotations) """Process status response.""" if status == "Pending": - if event["type"] == "DELETED": + # deletion_timestamp is set by kube server when a graceful deletion is requested. + # since kube server have received request to delete pod set TI state failed + if event["type"] == "DELETED" and kube_pod.metadata.deletion_timestamp is not None: self.log.info("Event: Failed to start pod %s, annotations: %s", pod_name, annotations_string) self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version)) else: @@ -251,7 +254,9 @@ def process_status( self.log.info("Event: %s Succeeded, annotations: %s", pod_name, annotations_string) self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version)) elif status == "Running": - if event["type"] == "DELETED": + # deletion_timestamp is set by kube server when a graceful deletion is requested. + # since kube server have received request to delete pod set TI state failed + if event["type"] == "DELETED" and kube_pod.metadata.deletion_timestamp is not None: self.log.info( "Event: Pod %s deleted before it could complete, annotations: %s", pod_name, diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index fc9daaacde2ca..ba9c33a36e421 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -1261,6 +1261,7 @@ def test_process_status_pending(self): def test_process_status_pending_deleted(self): self.events.append({"type": "DELETED", "object": self.pod}) + self.pod.metadata.deletion_timestamp = datetime.utcnow() self._run() self.assert_watcher_queue_called_once_with_state(State.FAILED) @@ -1305,6 +1306,7 @@ def test_process_status_succeeded_type_delete(self): def test_process_status_running_deleted(self): self.pod.status.phase = "Running" + self.pod.metadata.deletion_timestamp = datetime.utcnow() self.events.append({"type": "DELETED", "object": self.pod}) self._run() From c8fe9e0b6e8c29ba7bde343f1cd674a358f38eee Mon Sep 17 00:00:00 2001 From: Pankaj Date: Thu, 8 Jun 2023 13:28:43 +0530 Subject: [PATCH 2/2] cleanup --- airflow/executors/kubernetes_executor.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 24b988c6512bf..8707bf96990c4 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -220,13 +220,13 @@ def process_status( resource_version: str, event: Any, ) -> None: - kube_pod = event["object"] + pod = event["object"] annotations_string = annotations_for_logging_task_metadata(annotations) """Process status response.""" if status == "Pending": # deletion_timestamp is set by kube server when a graceful deletion is requested. # since kube server have received request to delete pod set TI state failed - if event["type"] == "DELETED" and kube_pod.metadata.deletion_timestamp is not None: + if event["type"] == "DELETED" and pod.metadata.deletion_timestamp: self.log.info("Event: Failed to start pod %s, annotations: %s", pod_name, annotations_string) self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version)) else: @@ -240,7 +240,6 @@ def process_status( # If our event type is DELETED, we have the POD_EXECUTOR_DONE_KEY, or the pod has # a deletion timestamp, we've already seen the initial Succeeded event and sent it # along to the scheduler. - pod = event["object"] if ( event["type"] == "DELETED" or POD_EXECUTOR_DONE_KEY in pod.metadata.labels @@ -256,7 +255,7 @@ def process_status( elif status == "Running": # deletion_timestamp is set by kube server when a graceful deletion is requested. # since kube server have received request to delete pod set TI state failed - if event["type"] == "DELETED" and kube_pod.metadata.deletion_timestamp is not None: + if event["type"] == "DELETED" and pod.metadata.deletion_timestamp: self.log.info( "Event: Pod %s deleted before it could complete, annotations: %s", pod_name,