Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,14 @@ def _manage_executor_state(
and ti.state in self.STATES_COUNT_AS_RUNNING
):
msg = (
f"Executor reports task instance {ti} finished ({state}) although the task says its "
f"{ti.state}. Was the task killed externally? Info: {info}"
f"The executor reported that the task instance {ti} finished with state {state}, "
f"but the task instance's state attribute is {ti.state}. "
"This indicates that the task was marked failed by something other than the scheduler. "

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might not be marked failed though? Should you be using ti.state here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if instead of saying ".....something other than the scheduler," it should be phrased as "something other than the worker/pod that is actually running the task," or something similar. The scheduler is still an external component and is responsible for failing tasks that are timed out while being stuck in the queue or detecting and killing zombie tasks. In fact, it is the scheduler that marks them as failed, no?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right that it is not precise.

In fact it's not necessarily scheduler or executor that marks task as "failed". It can also be marked as failed from the UI - directly in the DB and that does not involve scheduler or executor at all, then the process that monitors task execution will see it and stop the process. so the "externally" means that the process that run the task either failed or has been killed by something elase (but NOT by setting the state of the task in the DB). So I think maybe explaining that somehow in more detail would be useful (but rather pointing to the description of what happened not by trying to fit it into a long sentence where we try to squeeze all possible reason).

My initial idea was to write a short "this is how monitoring for task state works" and short descirption of possible reasons what kind of "External" factors can kill the task:

  • tasks failing usually with low-level C-library errors (SIGSEGV, BAD ACCESS etc.)
  • external process sending signal to the task (for example deployment manager - k8s etc. - deciding to free or move the execution to another machine by killing tasks running on it).
  • OOM where deployment has not enough memory to share with the task
  • some external scripts/ admins sending signal to the task
  • hardware error (fault memory/disk)

Etc.

Explaining all those reasons would not fit into a simple error message, but a bit generic description, pointing (via URL) to a detailed description in our documentation would be a great resource - both for users, who are "profficient enough" to follow up with these clues but also by .... us ... if we consider that any of the commiiters/triage people will attempt to help less-profficient users who will not follow (or even not click) that URL, when they copy&paste such error message, the triage team member WILL follow and learn about it - even if they did not know how it work.

"The task might have been marked failed by a user, by the task_queued_timeout configuration, "
"or it might have been killed by something else."
)
if info is not None:
msg += f" Extra info: {info}"
self.log.error(msg)
ti.handle_failure(error=msg)
continue
Expand Down
11 changes: 5 additions & 6 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,10 +773,9 @@ def _process_executor_events(self, session: Session) -> int:
"scheduler.tasks.killed_externally",
tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
)
msg = (
"Executor reports task instance %s finished (%s) although the "
"task says it's %s. (Info: %s) Was the task killed externally?"
)
msg = f"The executor reported that the task instance {ti} finished with state {state}, but the task instance's state attribute is {ti.state}. This indicates that the task was marked failed by something other than the scheduler. The task might have been marked failed by a user, by the task_queued_timeout configuration, or it might have been killed by something else."
if info is not None:
msg += f" Extra info: {info}"
self._task_context_logger.error(msg, ti, state, ti.state, info, ti=ti)

# Get task from the Serialized DAG
Expand All @@ -792,12 +791,12 @@ def _process_executor_events(self, session: Session) -> int:
request = TaskCallbackRequest(
full_filepath=ti.dag_model.fileloc,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=msg % (ti, state, ti.state, info),
msg=msg,
processor_subdir=ti.dag_model.processor_subdir,
)
self.job.executor.send_callback(request)
else:
ti.handle_failure(error=msg % (ti, state, ti.state, info), session=session)
ti.handle_failure(error=msg, session=session)

return len(event_buffer)

Expand Down
8 changes: 5 additions & 3 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,12 @@ def test_process_executor_events_with_callback(self, mock_stats_incr, mock_task_
full_filepath=dag.fileloc,
simple_task_instance=mock.ANY,
processor_subdir=None,
msg="Executor reports task instance "
msg="The executor reported that the task instance "
"<TaskInstance: test_process_executor_events_with_callback.dummy_task test [queued]> "
"finished (failed) although the task says it's queued. (Info: None) "
"Was the task killed externally?",
"finished with state failed, but the task instance's state attribute is queued. "
"This indicates that the task was marked failed by something other than the scheduler. "
"The task might have been marked failed by a user, by the task_queued_timeout configuration, "
"or it might have been killed by something else.",
)
scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback)
scheduler_job.executor.callback_sink.reset_mock()
Expand Down