From 33afe7a18b4404ac3bc147bf67c5aa5a1447fd2d Mon Sep 17 00:00:00 2001 From: Ryan Hatter Date: Wed, 19 Jun 2024 16:21:32 -0400 Subject: [PATCH 1/4] revamp some confusing log messages --- airflow/jobs/backfill_job_runner.py | 7 ++- airflow/jobs/local_task_job_runner.py | 5 +- airflow/jobs/scheduler_job_runner.py | 10 ++-- .../task/task_runner/standard_task_runner.py | 7 +-- docs/apache-airflow/core-concepts/tasks.rst | 7 ++- docs/apache-airflow/troubleshooting.rst | 48 +++++++++++++++++++ tests/jobs/test_scheduler_job.py | 6 +-- 7 files changed, 76 insertions(+), 14 deletions(-) create mode 100644 docs/apache-airflow/troubleshooting.rst diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 4fd6b24bbfc2a..caa579fce1695 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -313,9 +313,12 @@ def _manage_executor_state( and ti.state in self.STATES_COUNT_AS_RUNNING ): msg = ( - f"Executor reports task instance {ti} finished ({state}) although the task says its " - f"{ti.state}. Was the task killed externally? Info: {info}" + f"The executor reported that the task instance {ti} finished with state {state}, " + f"but the task instance's state attribute is {ti.state}. " + "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally" ) + if info is not None: + msg += f" Extra info: {info}" self.log.error(msg) ti.handle_failure(error=msg) continue diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index 4dd949192e7c8..bf5a74418242e 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -238,7 +238,10 @@ def handle_task_exit(self, return_code: int) -> None: self.log.info("Task exited with return code %s (task deferral)", return_code) _set_task_deferred_context_var() else: - self.log.info("Task exited with return code %s", return_code) + message = f"Task exited with return code {return_code}" + if return_code == -9: + message += "For more information, see https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed" + self.log.info(message) if not (self.task_instance.test_mode or is_deferral): if conf.getboolean("scheduler", "schedule_after_task_execution", fallback=True): diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 3d81aaeb56276..25b2924355ec1 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -774,9 +774,11 @@ def _process_executor_events(self, session: Session) -> int: tags={"dag_id": ti.dag_id, "task_id": ti.task_id}, ) msg = ( - "Executor reports task instance %s finished (%s) although the " - "task says it's %s. (Info: %s) Was the task killed externally?" + f"The executor reported that the task instance {ti} finished with state {state}, but the task instance's state attribute is {ti.state}." + "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally" ) + if info is not None: + msg += f" Extra info: {info}" self._task_context_logger.error(msg, ti, state, ti.state, info, ti=ti) # Get task from the Serialized DAG @@ -792,12 +794,12 @@ def _process_executor_events(self, session: Session) -> int: request = TaskCallbackRequest( full_filepath=ti.dag_model.fileloc, simple_task_instance=SimpleTaskInstance.from_ti(ti), - msg=msg % (ti, state, ti.state, info), + msg=msg, processor_subdir=ti.dag_model.processor_subdir, ) self.job.executor.send_callback(request) else: - ti.handle_failure(error=msg % (ti, state, ti.state, info), session=session) + ti.handle_failure(error=msg, session=session) return len(event_buffer) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 5ecf1ad64cebb..afc9a6b5644f5 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -183,10 +183,11 @@ def terminate(self): self._rc = -9 if self._rc == -9: - # If either we or psutil gives out a -9 return code, it likely means - # an OOM happened self.log.error( - "Job %s was killed before it finished (likely due to running out of memory)", + ( + "Job %s was killed before it finished (likely due to running out of memory)", + "For more information, see https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed", + ), self._task_instance.job_id, ) diff --git a/docs/apache-airflow/core-concepts/tasks.rst b/docs/apache-airflow/core-concepts/tasks.rst index ca1819603314d..0e05f55bcf5c8 100644 --- a/docs/apache-airflow/core-concepts/tasks.rst +++ b/docs/apache-airflow/core-concepts/tasks.rst @@ -245,7 +245,12 @@ No system runs perfectly, and task instances are expected to die once in a while * *Zombie tasks* are ``TaskInstances`` stuck in a ``running`` state despite their associated jobs being inactive (e.g. their process did not send a recent heartbeat as it got killed, or the machine died). Airflow will find these - periodically, clean them up, and either fail or retry the task depending on its settings. + periodically, clean them up, and either fail or retry the task depending on its settings. Tasks can become zombies for + many reasons, including: + + * The Airflow worker ran out of memory and was OOMKilled. + * The Airflow worker failed its liveness probe, so the system (for example, Kubernetes) restarted the worker. + * The system (for example, Kubernetes) scaled down and moved an Airflow worker from one node to another. * *Undead tasks* are tasks that are *not* supposed to be running but are, often caused when you manually edit Task Instances via the UI. Airflow will find them periodically and terminate them. diff --git a/docs/apache-airflow/troubleshooting.rst b/docs/apache-airflow/troubleshooting.rst new file mode 100644 index 0000000000000..c4245d09bef5d --- /dev/null +++ b/docs/apache-airflow/troubleshooting.rst @@ -0,0 +1,48 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _troubleshooting: + +Troubleshooting +=============== + +Obscure task failures +^^^^^^^^^^^^^^^^^^^^^ + +Task state changed externally +----------------------------- + +There are many potential causes for a task's state to be changed by a component other than the executor, which might cause some confusion when reviewing task instance or scheduler logs. + +Below are some example scenarios that could cause a task's state to change by a component other than the executor: + +- If a task's DAG failed to parse on the worker, the scheduler may mark the task as failed. If confirmed, consider increasing :ref:`core.dagbag_import_timeout ` and :ref:`core.dag_file_processor_timeout `. +- The scheduler will mark a task as failed if the task has been queued for longer than :ref:`scheduler.task_queued_timeout `. +- If a task becomes a :ref:`zombie `, it will be marked failed by the scheduler. +- A user marked the task as successful or failed in the Airflow UI. +- An external script or process used the :doc:`Airflow REST API ` to change the state of a task. + +LocalTaskJob killed +------------------- + +Sometimes, Airflow or some adjacent system will kill a task instance's ``LocalTaskJob``, causing the task instance to fail. + +Here are some examples that could cause such an event: + +- A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition. +- An Airflow worker running out of memory + - Usually, Airflow workers that run out of memory receive a SIGKILL and are marked as a zombie and failed by the scheduler. However, in some scenarios, Airflow kills the task before that happens. diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 9fcdba5ac5abb..a282456c1f99e 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -400,10 +400,10 @@ def test_process_executor_events_with_callback(self, mock_stats_incr, mock_task_ full_filepath=dag.fileloc, simple_task_instance=mock.ANY, processor_subdir=None, - msg="Executor reports task instance " + msg="The executor reported that the task instance " " " - "finished (failed) although the task says it's queued. (Info: None) " - "Was the task killed externally?", + "finished with state failed, but the task instance's state attribute is queued. " + "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally", ) scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback) scheduler_job.executor.callback_sink.reset_mock() From ac3d08411d80a297a455f21d6c96521d491ccf4f Mon Sep 17 00:00:00 2001 From: Ryan Hatter Date: Wed, 19 Jun 2024 18:07:00 -0400 Subject: [PATCH 2/4] fix tests --- airflow/jobs/scheduler_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 25b2924355ec1..087d610e42dbf 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -774,7 +774,7 @@ def _process_executor_events(self, session: Session) -> int: tags={"dag_id": ti.dag_id, "task_id": ti.task_id}, ) msg = ( - f"The executor reported that the task instance {ti} finished with state {state}, but the task instance's state attribute is {ti.state}." + f"The executor reported that the task instance {ti} finished with state {state}, but the task instance's state attribute is {ti.state}. " "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally" ) if info is not None: From 65be11ac1f6c6fdf5f394801d50d3481d0bc84de Mon Sep 17 00:00:00 2001 From: Ryan Hatter Date: Sun, 30 Jun 2024 15:07:33 -0400 Subject: [PATCH 3/4] address comments and static checks --- airflow/jobs/local_task_job_runner.py | 2 +- airflow/jobs/scheduler_job_runner.py | 5 +++-- airflow/task/task_runner/standard_task_runner.py | 6 +++--- docs/apache-airflow/index.rst | 1 + 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index bf5a74418242e..96e36bcfe7d4b 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -239,7 +239,7 @@ def handle_task_exit(self, return_code: int) -> None: _set_task_deferred_context_var() else: message = f"Task exited with return code {return_code}" - if return_code == -9: + if return_code == -signal.SIGKILL: message += "For more information, see https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed" self.log.info(message) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 087d610e42dbf..61d6d7aecb62f 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -774,11 +774,12 @@ def _process_executor_events(self, session: Session) -> int: tags={"dag_id": ti.dag_id, "task_id": ti.task_id}, ) msg = ( - f"The executor reported that the task instance {ti} finished with state {state}, but the task instance's state attribute is {ti.state}. " + "The executor reported that the task instance %s finished with state %s, but the task instance's state attribute is %s. " # noqa: RUF100, UP031, flynt "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally" + % (ti, state, ti.state) ) if info is not None: - msg += f" Extra info: {info}" + msg += " Extra info: %s" % info # noqa: RUF100, UP031, flynt self._task_context_logger.error(msg, ti, state, ti.state, info, ti=ti) # Get task from the Serialized DAG diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index afc9a6b5644f5..5f0606fe5e245 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -21,6 +21,7 @@ import logging import os +import signal import threading import time from typing import TYPE_CHECKING @@ -74,7 +75,6 @@ def _start_by_fork(self): else: # Start a new process group set_new_process_group() - import signal signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) @@ -180,9 +180,9 @@ def terminate(self): if self._rc is None: # Something else reaped it before we had a chance, so let's just "guess" at an error code. - self._rc = -9 + self._rc = -signal.SIGKILL - if self._rc == -9: + if self._rc == -signal.SIGKILL: self.log.error( ( "Job %s was killed before it finished (likely due to running out of memory)", diff --git a/docs/apache-airflow/index.rst b/docs/apache-airflow/index.rst index 2c8c14f3d2cdf..3897be717866e 100644 --- a/docs/apache-airflow/index.rst +++ b/docs/apache-airflow/index.rst @@ -151,6 +151,7 @@ so coding will always be required. public-airflow-interface best-practices faq + troubleshooting Release Policies release_notes privacy_notice From c7e4dafa4f92b79c5113f0a78793f4c2bee76462 Mon Sep 17 00:00:00 2001 From: Ryan Hatter Date: Sun, 30 Jun 2024 15:31:41 -0400 Subject: [PATCH 4/4] fix docs build --- docs/apache-airflow/troubleshooting.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow/troubleshooting.rst b/docs/apache-airflow/troubleshooting.rst index c4245d09bef5d..076aa5501bcac 100644 --- a/docs/apache-airflow/troubleshooting.rst +++ b/docs/apache-airflow/troubleshooting.rst @@ -30,7 +30,7 @@ There are many potential causes for a task's state to be changed by a component Below are some example scenarios that could cause a task's state to change by a component other than the executor: -- If a task's DAG failed to parse on the worker, the scheduler may mark the task as failed. If confirmed, consider increasing :ref:`core.dagbag_import_timeout ` and :ref:`core.dag_file_processor_timeout `. +- If a task's DAG failed to parse on the worker, the scheduler may mark the task as failed. If confirmed, consider increasing :ref:`core.dagbag_import_timeout ` and :ref:`core.dag_file_processor_timeout `. - The scheduler will mark a task as failed if the task has been queued for longer than :ref:`scheduler.task_queued_timeout `. - If a task becomes a :ref:`zombie `, it will be marked failed by the scheduler. - A user marked the task as successful or failed in the Airflow UI.