Skip to content

Tighten on_task_instance_failed error type to BaseException | None#66399

Closed
1fanwang wants to merge 3 commits into
apache:mainfrom
1fanwang:1fanwang/listener-failed-error-type
Closed

Tighten on_task_instance_failed error type to BaseException | None#66399
1fanwang wants to merge 3 commits into
apache:mainfrom
1fanwang:1fanwang/listener-failed-error-type

Conversation

@1fanwang

@1fanwang 1fanwang commented May 5, 2026

Copy link
Copy Markdown
Contributor

Description

The error argument on on_task_instance_failed was typed
None | str | BaseException. The string variant only ever appeared on
the manual-set FAILED state path on the API server, where the call site
passed a hard-coded human-readable message. Listener implementations had
to isinstance(error, str) to detect that path even though
str(error) worked uniformly across both branches.

This PR:

  • Wraps the manual-set message in a RuntimeError at the API call site
    in _emit_state_listener_hooks.
  • Tightens the on_task_instance_failed hookspec type to
    BaseException | None.
  • Updates the example listener plugin and the ClassBasedListener test
    fixture to use the new type, and exposes last_error on the fixture.
  • Extends the existing parametrized
    test_patch_task_instance_notifies_listeners test: when the new
    state is failed, asserts the listener received a RuntimeError
    whose str() carries the human-readable manual-set message.

Why now

Listeners now have a uniform contract: error is always either an
exception or None. The msg arg added in #66394 already lets a
listener distinguish manual-set from worker paths
(msg == \"manually_set_to_failed\"); the string error was a second,
redundant signal carrying the same intent in lossier form.

Backwards compatibility

This is a typed-API change. Listener implementations that rely on
isinstance(error, str) to detect the manual-set path will need to
read the message via str(error) and route on msg (or
isinstance(error, RuntimeError)).

The existing msg keyword argument is the recommended dispatch axis
going forward — it doesn't depend on inspecting error at all.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.

E2E validation

=== hookspec signature ===
  on_task_instance_failed(..., error: BaseException | None)
  PASS — bare str alternative removed

=== API server manual-FAILED path wraps in RuntimeError ===
  found: error=RuntimeError(f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.")

=== runtime: listener receives RuntimeError ===
  type: RuntimeError
  str:  TaskInstance's state was manually set to `failed`.

Listener authors who previously did isinstance(error, str) to detect the manual-set path now read str(error) for the message and route on msg == "manually_set_to_failed" (introduced in #66394).

Real e2e validation (Airflow standalone)

Re-ran with airflow standalone against the worktree's editable install. Triggered a successful task run, then PATCHed its state to failed via the API server's /api/v2/dags/.../taskInstances/{task} endpoint to exercise _emit_state_listener_hooks. Recording listener captured the error arg type:

failed   prev=None  task=ok_task  error_type=RuntimeError  error_str=TaskInstance's state was manually set to `failed`.

Compare to the same path on PR #66394 (without this PR applied):

failed   prev=None  task=ok_task  error_type=str           error=TaskInstance's state was manually set to `failed`.

Same human-readable message, but the listener now receives a RuntimeError instance — isinstance(error, BaseException) works uniformly across worker-side and API-driven failure paths. Listener authors no longer need an isinstance(error, str) check to detect the manual-set path.

Integrated mega-branch validation (all 7 PRs composed)

This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services — airflow standalone running scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:

running   prev=QUEUED    msg=started               task=ok_task                                        ← PR-A msg arg
success   prev=RUNNING   msg=success               task=ok_task                                        ← PR-A
running   prev=QUEUED    msg=started               task=boom_task
failed    prev=RUNNING   msg=failed                task=boom_task   error_type=ValueError   fd=None    ← PR-A + PR-D + PR-F kwarg
running   prev=QUEUED    msg=started               task=skip_task
skipped   prev=RUNNING   msg=skipped               task=skip_task                                      ← PR-A skipped path
running   prev=QUEUED    msg=started               task=retry_task
failed    prev=RUNNING   msg=up_for_retry          task=retry_task  error_type=ValueError              ← PR-A retry-vs-terminal
running   prev=QUEUED    msg=started               task=retry_task    (try 2 of 2)
failed    prev=RUNNING   msg=failed                task=retry_task  error_type=ValueError
running   prev=QUEUED    msg=started               task=checkpoint_task
checkpointed prev=RUNNING                          task=checkpoint_task  checkpoint_data={'step': 5,
                                                                          'iterator_offset': 1024}     ← PR-E + PR-G

--- BEGIN MANUAL SET (PATCH /api/v2/.../taskInstances/ok_task new_state=failed) ---
failed    prev=None      msg=manually_set_to_failed task=ok_task    error_type=RuntimeError   fd=None  ← PR-D RuntimeError wrap
                                                                                                          (would be `str` on the PR-A-only branch)

What this validates jointly:

PR Surface Evidence in log
#66394 (msg arg) every TI hook has msg=... 6 canonical values fire (started, success, failed, skipped, up_for_retry, manually_set_to_failed)
#66395 (hook-name log, TI) logs identify the failing hook tested separately with throwing listener — see PR body
#66397 (hook-name log, rest) lifecycle / DagRun / asset surfaces tested separately with throwing listener — see PR body
#66399 (tighten error type) error: BaseException | None manual-set path delivers RuntimeError (was str on PR-A alone)
#66402 (CHECKPOINTED state) worker catches AirflowTaskCheckpointed running → checkpointed transition observed at the listener and at the supervisor message boundary
#66405 (FailureDetails) listener can declare failure_details kwarg failure_details=None flowing through every failure (no executor populates yet)
#66410 (on_task_instance_checkpointed) new hook fires with payload checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}

Repro

# Combine all 7 branches onto a mega branch (resolve trivial overlap on the
# spec file's failure hook signature — error + msg + failure_details kwargs
# in one signature) and install editable:
pip install -e shared/listeners -e task-sdk -e airflow-core
AIRFLOW__CORE__EXECUTOR=LocalExecutor airflow standalone &

# Drop the recording listener (declares all 5 hooks including the new
# checkpointed one) into $AIRFLOW_HOME/plugins/, drop 5 DAGs into dags/
# (success / failed / skipped / retry-then-fail / checkpointed), trigger them.
for dag in e2e_success e2e_failed e2e_skipped e2e_retry_then_fail e2e_checkpointed; do
  airflow dags trigger $dag
done

# Then PATCH a state via the public API to exercise the manual path.

Bugs surfaced and fixed during this validation

This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:

Last two would have broken every task failure on apache/airflow main if the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.

Documented gap (deliberately not fixed in this stack)

task-sdk/.../supervisor.py:STATES_SENT_DIRECTLY lists the states the worker sends to the supervisor with a dedicated direct-send branch. CHECKPOINTED is not in that list, so it falls back to client.task_instances.finish() which the API server constrains to terminal states. The mega listener log shows the worker successfully logging Task checkpointed; reporting CHECKPOINTED state. and on_task_instance_checkpointed firing with the correct payload — but the DB row eventually transitions to failed because the supervisor cannot persist CHECKPOINTED through finish(). This is the AIP-96 design knob (auto-resume vs manual-resume-only) we deliberately want the discussion to settle, not silently pick. Documented in #66402.

The ``error`` arg on ``on_task_instance_failed`` was typed
``None | str | BaseException``. The string variant only ever appeared on the
manual-set FAILED state path on the API server, where the call site passed
a hard-coded human-readable message. Listener implementations had to
``isinstance(error, str)`` to detect that path even though ``str(error)``
worked uniformly across both branches.

This change wraps the manual-set message in a ``RuntimeError`` at the call
site, tightens the hookspec type to ``BaseException | None``, and updates
the example listener and test fixture accordingly. Listeners now always
receive an exception type with ``str(error)`` carrying the message; the
``msg`` arg added in apache#66394 (``msg == "manually_set_to_failed"``) remains
the canonical signal for "this came from the API path".

Backwards compatibility: listeners relying on ``isinstance(error, str)``
will need to read the message via ``str(error)`` and route on ``msg``.
@boring-cyborg boring-cyborg Bot added the area:API Airflow's REST/HTTP API label May 5, 2026
@potiuk

potiuk commented Jun 1, 2026

Copy link
Copy Markdown
Member

@1fanwang This draft PR has had no activity for 2 weeks. Closing to keep the queue clean.

You are welcome to reopen and continue when you're ready. If you'd like to pick it back up, please rebase onto the current main branch first.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@potiuk potiuk closed this Jun 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants