diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 4fd6b24bbfc2a..0b24dde8f02a0 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -313,9 +313,14 @@ def _manage_executor_state( and ti.state in self.STATES_COUNT_AS_RUNNING ): msg = ( - f"Executor reports task instance {ti} finished ({state}) although the task says its " - f"{ti.state}. Was the task killed externally? Info: {info}" + f"The executor reported that the task instance {ti} finished with state {state}, " + f"but the task instance's state attribute is {ti.state}. " + "This indicates that the task was marked failed by something other than the scheduler. " + "The task might have been marked failed by a user, by the task_queued_timeout configuration, " + "or it might have been killed by something else." ) + if info is not None: + msg += f" Extra info: {info}" self.log.error(msg) ti.handle_failure(error=msg) continue diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 3d81aaeb56276..72ae4cb13ebd4 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -773,10 +773,9 @@ def _process_executor_events(self, session: Session) -> int: "scheduler.tasks.killed_externally", tags={"dag_id": ti.dag_id, "task_id": ti.task_id}, ) - msg = ( - "Executor reports task instance %s finished (%s) although the " - "task says it's %s. (Info: %s) Was the task killed externally?" - ) + msg = f"The executor reported that the task instance {ti} finished with state {state}, but the task instance's state attribute is {ti.state}. This indicates that the task was marked failed by something other than the scheduler. The task might have been marked failed by a user, by the task_queued_timeout configuration, or it might have been killed by something else." + if info is not None: + msg += f" Extra info: {info}" self._task_context_logger.error(msg, ti, state, ti.state, info, ti=ti) # Get task from the Serialized DAG @@ -792,12 +791,12 @@ def _process_executor_events(self, session: Session) -> int: request = TaskCallbackRequest( full_filepath=ti.dag_model.fileloc, simple_task_instance=SimpleTaskInstance.from_ti(ti), - msg=msg % (ti, state, ti.state, info), + msg=msg, processor_subdir=ti.dag_model.processor_subdir, ) self.job.executor.send_callback(request) else: - ti.handle_failure(error=msg % (ti, state, ti.state, info), session=session) + ti.handle_failure(error=msg, session=session) return len(event_buffer) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 9fcdba5ac5abb..1ff520c7198b2 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -400,10 +400,12 @@ def test_process_executor_events_with_callback(self, mock_stats_incr, mock_task_ full_filepath=dag.fileloc, simple_task_instance=mock.ANY, processor_subdir=None, - msg="Executor reports task instance " + msg="The executor reported that the task instance " " " - "finished (failed) although the task says it's queued. (Info: None) " - "Was the task killed externally?", + "finished with state failed, but the task instance's state attribute is queued. " + "This indicates that the task was marked failed by something other than the scheduler. " + "The task might have been marked failed by a user, by the task_queued_timeout configuration, " + "or it might have been killed by something else.", ) scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback) scheduler_job.executor.callback_sink.reset_mock()