Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,12 @@ def _manage_executor_state(
and ti.state in self.STATES_COUNT_AS_RUNNING
):
msg = (
f"Executor reports task instance {ti} finished ({state}) although the task says its "
f"{ti.state}. Was the task killed externally? Info: {info}"
f"The executor reported that the task instance {ti} finished with state {state}, "
f"but the task instance's state attribute is {ti.state}. "
"Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally"
)
if info is not None:
msg += f" Extra info: {info}"
self.log.error(msg)
ti.handle_failure(error=msg)
continue
Expand Down
5 changes: 4 additions & 1 deletion airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ def handle_task_exit(self, return_code: int) -> None:
self.log.info("Task exited with return code %s (task deferral)", return_code)
_set_task_deferred_context_var()
else:
self.log.info("Task exited with return code %s", return_code)
message = f"Task exited with return code {return_code}"
if return_code == -signal.SIGKILL:
message += "For more information, see https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed"
self.log.info(message)

if not (self.task_instance.test_mode or is_deferral):
if conf.getboolean("scheduler", "schedule_after_task_execution", fallback=True):
Expand Down
11 changes: 7 additions & 4 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,9 +774,12 @@ def _process_executor_events(self, session: Session) -> int:
tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
)
msg = (
"Executor reports task instance %s finished (%s) although the "
"task says it's %s. (Info: %s) Was the task killed externally?"
"The executor reported that the task instance %s finished with state %s, but the task instance's state attribute is %s. " # noqa: RUF100, UP031, flynt
"Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally"
% (ti, state, ti.state)
)
if info is not None:
msg += " Extra info: %s" % info # noqa: RUF100, UP031, flynt
self._task_context_logger.error(msg, ti, state, ti.state, info, ti=ti)
Comment on lines +777 to 783

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduced a bug. You are formatting the text twice, one line 779, and then again in self._task_context_logger.error. Fix here: #40563


# Get task from the Serialized DAG
Expand All @@ -792,12 +795,12 @@ def _process_executor_events(self, session: Session) -> int:
request = TaskCallbackRequest(
full_filepath=ti.dag_model.fileloc,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=msg % (ti, state, ti.state, info),
msg=msg,
processor_subdir=ti.dag_model.processor_subdir,
)
self.job.executor.send_callback(request)
else:
ti.handle_failure(error=msg % (ti, state, ti.state, info), session=session)
ti.handle_failure(error=msg, session=session)

return len(event_buffer)

Expand Down
13 changes: 7 additions & 6 deletions airflow/task/task_runner/standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
import os
import signal
import threading
import time
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -74,7 +75,6 @@ def _start_by_fork(self):
else:
# Start a new process group
set_new_process_group()
import signal

signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
Expand Down Expand Up @@ -180,13 +180,14 @@ def terminate(self):

if self._rc is None:
# Something else reaped it before we had a chance, so let's just "guess" at an error code.
self._rc = -9
self._rc = -signal.SIGKILL

if self._rc == -9:
# If either we or psutil gives out a -9 return code, it likely means
# an OOM happened
if self._rc == -signal.SIGKILL:
self.log.error(
"Job %s was killed before it finished (likely due to running out of memory)",
(
"Job %s was killed before it finished (likely due to running out of memory)",
"For more information, see https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed",
),
self._task_instance.job_id,
)

Expand Down
7 changes: 6 additions & 1 deletion docs/apache-airflow/core-concepts/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ No system runs perfectly, and task instances are expected to die once in a while

* *Zombie tasks* are ``TaskInstances`` stuck in a ``running`` state despite their associated jobs being inactive
(e.g. their process did not send a recent heartbeat as it got killed, or the machine died). Airflow will find these
periodically, clean them up, and either fail or retry the task depending on its settings.
periodically, clean them up, and either fail or retry the task depending on its settings. Tasks can become zombies for
many reasons, including:

* The Airflow worker ran out of memory and was OOMKilled.
* The Airflow worker failed its liveness probe, so the system (for example, Kubernetes) restarted the worker.
* The system (for example, Kubernetes) scaled down and moved an Airflow worker from one node to another.

* *Undead tasks* are tasks that are *not* supposed to be running but are, often caused when you manually edit Task
Instances via the UI. Airflow will find them periodically and terminate them.
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ so coding will always be required.
public-airflow-interface
best-practices
faq
troubleshooting
Release Policies <release-process>
release_notes
privacy_notice
Expand Down
48 changes: 48 additions & 0 deletions docs/apache-airflow/troubleshooting.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

.. _troubleshooting:

Troubleshooting
===============

Obscure task failures
^^^^^^^^^^^^^^^^^^^^^

Task state changed externally
-----------------------------

There are many potential causes for a task's state to be changed by a component other than the executor, which might cause some confusion when reviewing task instance or scheduler logs.

Below are some example scenarios that could cause a task's state to change by a component other than the executor:

- If a task's DAG failed to parse on the worker, the scheduler may mark the task as failed. If confirmed, consider increasing :ref:`core.dagbag_import_timeout <config:core__dagbag_import_timeout>` and :ref:`core.dag_file_processor_timeout <config:core__dag_file_processor_timeout>`.
- The scheduler will mark a task as failed if the task has been queued for longer than :ref:`scheduler.task_queued_timeout <config:scheduler__task_queued_timeout>`.
- If a task becomes a :ref:`zombie <concepts:zombies>`, it will be marked failed by the scheduler.
- A user marked the task as successful or failed in the Airflow UI.
- An external script or process used the :doc:`Airflow REST API <stable-rest-api-ref>` to change the state of a task.

LocalTaskJob killed
-------------------

Sometimes, Airflow or some adjacent system will kill a task instance's ``LocalTaskJob``, causing the task instance to fail.

Here are some examples that could cause such an event:

- A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition.
- An Airflow worker running out of memory
- Usually, Airflow workers that run out of memory receive a SIGKILL and are marked as a zombie and failed by the scheduler. However, in some scenarios, Airflow kills the task before that happens.
6 changes: 3 additions & 3 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,10 @@ def test_process_executor_events_with_callback(self, mock_stats_incr, mock_task_
full_filepath=dag.fileloc,
simple_task_instance=mock.ANY,
processor_subdir=None,
msg="Executor reports task instance "
msg="The executor reported that the task instance "
"<TaskInstance: test_process_executor_events_with_callback.dummy_task test [queued]> "
"finished (failed) although the task says it's queued. (Info: None) "
"Was the task killed externally?",
"finished with state failed, but the task instance's state attribute is queued. "
"Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally",
)
scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback)
scheduler_job.executor.callback_sink.reset_mock()
Expand Down