Skip to content

Requeue KubernetesExecutor tasks whose pod failed before execution started#69058

Open
seanmuth wants to merge 6 commits into
apache:mainfrom
seanmuth:feature/k8s-executor-pre-execution-requeue
Open

Requeue KubernetesExecutor tasks whose pod failed before execution started#69058
seanmuth wants to merge 6 commits into
apache:mainfrom
seanmuth:feature/k8s-executor-pre-execution-requeue

Conversation

@seanmuth

Copy link
Copy Markdown
Contributor

When a worker pod is destroyed before the task process starts — a node drain, autoscaler scale-down, node boot race, or transient image pull failure — the task instance is still in queued state and no task code has run. Today the KubernetesExecutor reports this to the scheduler as a normal FAILED, which consumes a user-configured task retry and raises a misleading failure alert for work that never executed.

This adds a transparent, executor-level requeue for that case. In _change_state, a pod that reports FAILED while its task instance is still QUEUED is requeued onto the existing task_queue (the same mechanism task_publish_max_retries already uses for pod creation failures) without writing to the event buffer, so the scheduler never observes the failure and no task-level retry is consumed.

Behavior is bounded and configurable:

  • pod_launch_failure_retries (default 1, -1 unlimited, 0 disables) — how many times a task is transparently requeued before failing normally.
  • pod_launch_failure_excluded_container_reasons (default Error) — container reasons that opt out of the requeue path and consume a normal retry instead. The default excludes Error, which covers a container that started executing but whose worker process exited before writing running to the DB (most likely an Airflow-specific startup error rather than a transient infrastructure event).

The ti_state == QUEUED check is the authoritative signal: a task that was actually executing would already have transitioned to running, so OOM-kills and other mid-execution failures are unaffected. Deferrable-operator resume pods are covered for free — when the triggerer fires, the TI returns to queued, so a resume pod killed before execute_complete starts is requeued rather than discarding already-completed external work.

closes: #69052


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4.8)

Generated-by: Claude Code (Opus 4.8) following the guidelines

@boring-cyborg boring-cyborg Bot added area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels Jun 26, 2026
@seanmuth

Copy link
Copy Markdown
Contributor Author

Working on a test confirmation in Astro Hosted now

Comment on lines +501 to +506
if state == TaskInstanceState.FAILED and self._is_pre_execution_failure(
state,
self._get_task_instance_state(key, session=session),
failure_details,
self.pod_launch_failure_excluded_container_reasons,
):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if state == TaskInstanceState.FAILED and self._is_pre_execution_failure(
state,
self._get_task_instance_state(key, session=session),
failure_details,
self.pod_launch_failure_excluded_container_reasons,
):
if (
state == TaskInstanceState.FAILED
and self.pod_launch_failure_max_retries != 0
and self._is_pre_execution_failure(
state,
self._get_task_instance_state(key, session=session),
failure_details,
self.pod_launch_failure_excluded_container_reasons,
)
):

This saves some db queries when no retries are allowed. Not a big deal, I think? (It shouldn’t be too common to explicitly disallow retries.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied in 2352e1f453 — the pod_launch_failure_max_retries != 0 guard now short-circuits before the task-instance state lookup, so the DB query is skipped entirely when requeues are disabled.


Drafted-by: Claude Code (Opus 4.8); reviewed by @seanmuth before posting

Comment on lines +471 to +484
if state == ADOPTED:
# When the task pod is adopted by another executor,
# then remove the task from the current executor running queue.
self.last_known_jobs.pop(key, None)
try:
self.running.remove(key)
except KeyError:
self.log.debug("TI key not in running: %s", key)
return

if state == TaskInstanceState.RUNNING:
# The task process started, so any later failure is an execution failure that should
# not be requeued by the pre-execution path below.
self.last_known_jobs.pop(key, None)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should probably also clean pod_launch_failure_max_retries?

I wonder if we should wrap the containers into an object so they are always handled together.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 2352e1f453 — the job spec, requeue count, and last-requeued pod are now wrapped in a single _PodLaunchAttempt per key (pod_launch_attempts), popped in one place across all exit paths. That also fixes the count leaking: the previous separate pod_launch_failure_attempts counter was never cleared on the ADOPTED path.


Drafted-by: Claude Code (Opus 4.8); reviewed by @seanmuth before posting

@uranusjr

Copy link
Copy Markdown
Member

We should add a release note for this. Especially since this changes the default behavior—all pods are now retried once by default instead of directly fail.

We should probably also call out somewhere that if you set delete_worker_pods_on_failure = false (the default!) things may accumulate if you set retries to -1 (indefinite). You can break the system quite badly with say a misconfigured image that crashes immediately on launch.

@arkadiuszbach arkadiuszbach left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding my findings, as I was working on task failures in KubernetesExecutor due to SIGTERM handling and this PR might address the failures before the Supervisor gets to register its own SIGTERM handler, see #69034

self.log.debug("TI key not in running: %s", key)
return

if state == TaskInstanceState.RUNNING:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is currently a dead code, TaskInstanceState.RUNNING is never set in process_status

From what i can understand it will indicate POD running status, but POD may get Failed after being RUNNING and the task still being in QUEUED state, for example due to SIGTERM

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right — removed the cleanup I'd added in the RUNNING branch, since the watcher never emits RUNNING. On the SIGTERM case you raised: a pod that reaches Running then Fails before the runner writes running stays QUEUED, so the pre-execution path requeues it — which is the intended outcome (the task never committed to running), and complements #69034.


Drafted-by: Claude Code (Opus 4.8); reviewed by @seanmuth before posting

)
# Leave the key in self.running and do not write to event_buffer: the scheduler
# never observes this failure, so no task-level retry is consumed.
self.task_queue.put(job)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kubernetes may generate multiple events with pod.status.phase=Failed in a short period of time for a single pod. I was checking recently what events it would generate due to SIGTERM and noticed two MODIFIED followed by DELETED, all pod.status.phase=Failed, see below:

2026-06-29T15:26:17.051533Z [debug    ] Event: test-kubernetes-0-l32duhyu had an event of type MODIFIED [airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher] loc=kubernetes_executor_utils.py:159
2026-06-29T15:26:17.051986Z [debug    ] Creating task key for annotations {'dag_id': 'test', 'task_id': 'kubernetes_0', 'logical_date': None, 'run_id': 'manual__2026-06-25T14:26:59.504591+00:00', 'try_number': '3'} [airflow.providers.cncf.kubernetes.kubernetes_helper_functions] loc=kubernetes_helper_functions.py:159
2026-06-29T15:26:17.052418Z [warning  ] Event: test-kubernetes-0-l32duhyu Failed, task: test.kubernetes_0.3, annotations: <omitted> [airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher] loc=kubernetes_executor_utils.py:308
2026-06-29T15:26:17.340492Z [debug    ] Event: test-kubernetes-0-l32duhyu had an event of type MODIFIED [airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher] loc=kubernetes_executor_utils.py:159
2026-06-29T15:26:17.340786Z [debug    ] Creating task key for annotations {'dag_id': 'test', 'task_id': 'kubernetes_0', 'logical_date': None, 'run_id': 'manual__2026-06-25T14:26:59.504591+00:00', 'try_number': '3'} [airflow.providers.cncf.kubernetes.kubernetes_helper_functions] loc=kubernetes_helper_functions.py:159
2026-06-29T15:26:17.340985Z [warning  ] Event: test-kubernetes-0-l32duhyu Failed, task: test.kubernetes_0.3, annotations: <omitted> [airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher] loc=kubernetes_executor_utils.py:308
2026-06-29T15:26:17.372148Z [debug    ] Running SchedulerJobRunner._create_dagruns_for_dags with retries. Try 1 of 50 [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=retries.py:100
2026-06-29T15:26:17.381234Z [debug    ] Event: test-kubernetes-0-l32duhyu had an event of type DELETED [airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher] loc=kubernetes_executor_utils.py:159
2026-06-29T15:26:17.381554Z [debug    ] Creating task key for annotations {'dag_id': 'test', 'task_id': 'kubernetes_0', 'logical_date': None, 'run_id': 'manual__2026-06-25T14:26:59.504591+00:00', 'try_number': '3'} [airflow.providers.cncf.kubernetes.kubernetes_helper_functions] loc=kubernetes_helper_functions.py:159
2026-06-29T15:26:17.381744Z [warning  ] Event: test-kubernetes-0-l32duhyu Failed, task: test.kubernetes_0.3, annotations: <omitted> [airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher] loc=kubernetes_executor_utils.py:308

so maybe self.last_known_jobs.pop(key, None) should be placed right after self.task_queue.put(job)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — confirmed (MODIFIED×2 + DELETED on SIGTERM). I went a slightly different way than popping the job right after put: that would cap requeues at 1, because the requeue path re-queues the stored job directly rather than going back through execute_async to re-stash it. Instead the executor records the pod each requeue was issued for (_PodLaunchAttempt.requeued_for_pod) and ignores duplicate Failed events for that same pod, while a distinct (requeued) pod still requeues. Covered by test_change_state_pre_execution_failure_dedupes_repeated_events. Fixed in 2352e1f453.


Drafted-by: Claude Code (Opus 4.8); reviewed by @seanmuth before posting

@seanmuth

Copy link
Copy Markdown
Contributor Author

Pushed 2352e1f453 addressing the review feedback:

  • Short-circuit when requeues are disabled — the task-instance state DB lookup is now skipped entirely when pod_launch_failure_retries = 0.
  • Consolidated per-key state — the job spec, requeue count, and the pod each requeue was last issued for now live in a single _PodLaunchAttempt per key, cleaned in one place across all exit paths. This also fixes the requeue count leaking when a pod is adopted (the previous separate counter was never cleared on the ADOPTED path).
  • Dedupe duplicate Failed events — Kubernetes can emit several Failed events for one pod (e.g. MODIFIED×2 + DELETED on SIGTERM); each previously triggered another requeue. The executor now records the pod a requeue was issued for and ignores duplicate events for that same pod, while a distinct (requeued) pod still requeues. New test: test_change_state_pre_execution_failure_dedupes_repeated_events.
  • Removed dead RUNNING branch — the watcher never emits RUNNING, so the cleanup there was dead code.
  • Docs / changelog — documented the default-behavior change (pre-execution pod failures are now requeued once before failing; set pod_launch_failure_retries = 0 to restore the previous immediate-fail behavior) and the warning that -1 (unlimited) with a crash-on-launch image accumulates pods under the default delete_worker_pods_on_failure = False.

Full TestKubernetesExecutor suite passes locally (124 passed, 1 skipped). Per-thread replies inline.


Drafted-by: Claude Code (Opus 4.8); reviewed by @seanmuth before posting

seanmuth added 4 commits June 30, 2026 16:02
…arted

When a worker pod is destroyed before the task process starts (node drain, autoscaler scale-down, node boot race, transient image pull failure), the task instance is still queued and no task code has run. Reporting this to the scheduler as a normal failure consumes a user-configured retry and raises a misleading failure alert for work that never executed. The executor already has the signal to tell this apart from an execution failure, so it now transparently requeues the pod without consuming a task retry, bounded by pod_launch_failure_retries and excluding container reasons in pod_launch_failure_excluded_container_reasons (default Error).
The tests replaced executor.task_queue with a MagicMock, so the executor.end() teardown looped forever on get_nowait() instead of raising Empty, hitting the 60s CI timeout. Assert the requeue via observable executor state instead of mocking the queue, and pass a valid executor_config to the stash test so execute_async does not bail before recording the job.
Kubernetes can emit several Failed events for a single worker pod (for example two MODIFIED then DELETED on SIGTERM); each one previously triggered another requeue, creating duplicate pods and over-counting attempts. Track the job spec, requeue count, and the pod a requeue was last issued for together in one per-key object so duplicate events for the same pod are ignored and the requeue count no longer leaks when a pod is adopted. Skip the task-instance state lookup when requeues are disabled, and document the default-behavior change and the accumulation risk of setting the retry count to -1.
…oss scheduler restarts

Clarify that the in-memory requeue state is intentionally not persisted and that the dead-scheduler adoption path is a safe no-op for it, so a future reader doesn't mistake the lack of recovery for a bug.
@seanmuth seanmuth force-pushed the feature/k8s-executor-pre-execution-requeue branch from 7bfd8f3 to 6bde46e Compare June 30, 2026 21:05
@seanmuth

Copy link
Copy Markdown
Contributor Author

Rebased onto current main to resolve the conflict introduced by #68674 (25c0b3f).

What #68674 changed under this PR: it fixed the never-drained adoption set by converting self.completed from a setdict[(namespace, pod_name), KubernetesResults], and reworked sync() to drain it via a keep-failures loop (_change_state is retried next sync if it raises, otherwise the entry is dropped). It did not touch _change_state itself — so the only real textual overlap with this PR was in __init__ (the self.completed declaration sits next to the new requeue state) plus an unrelated coordinator_kube_image arg main had added to KubernetesJob in execute_async.

How I reconciled it:

  • __init__: kept pod_launch_attempts and adopted the new self.completed: dict[(namespace, pod_name), ...] type.
  • execute_async: built the job with the new coordinator_kube_image arg and registered its _PodLaunchAttempt.
  • _change_state / helpers / tests: merged cleanly (non-overlapping regions).

Why it composes correctly (not just a marker resolution):

  • The requeue branch only fires on state == FAILED; adopted completed pods carry state == "completed", so they drain through KubernetesExecutor: self.completed adoption set is never drained #68674's new loop untouched.
  • The requeue path returns rather than raising, so a requeued task is dropped from self.completed instead of lingering in the new keep-failures dict.
  • Adopted pods have no pod_launch_attempts entry, so a pre-execution failure on them falls through to a normal fail — consistent with the ephemeral-state semantics documented on pod_launch_attempts.

Verified locally: ruff clean, full test_kubernetes_executor.py class passes (142 passed, 1 skipped — the pre-3.2 gate).

Rebase performed by Claude (Opus 4.8), reviewed by me.

…equeues'

The pre-execution-failure check ran a metadata-db query for every failed pod, breaking the test_pod_failure_logging_* K8s system tests (no task_instance table) and needlessly querying for adopted/finalized pods. Gate the lookup on an existing pod_launch_attempts entry so only pods this executor launched and still tracks are considered. Also add the plural 'requeues' to the docs spelling wordlist (singular forms were already present).
@seanmuth

seanmuth commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

Pushed 797256b18c to fix two CI failures from the rebased run:

1. K8s system teststest_pod_failure_logging_with_container_terminated and test_pod_failure_logging_exception_handling failed with sqlite3.OperationalError: no such table: task_instance. Root cause was mine: the pre-execution-failure check ran a metadata-db lookup (_get_task_instance_state) for every FAILED pod, but those tests call _change_state directly with a FAILED result against a DB with no task_instance table (previously the lookup only ran when state is None).

Fix — and a genuine improvement: reorder the guard so the cheap in-memory pod_launch_attempts.get(key) check runs first, and the DB lookup only happens when there's actually an attempt to requeue. Adopted pods, already-finalized pods, and directly-injected results have no entry, so they skip the query entirely and fall through to the normal fail path.

# Only pods this executor launched and is still tracking can be requeued; checking the
# in-memory attempt first avoids a metadata-db lookup for adopted or already-finalized pods.
attempt = self.pod_launch_attempts.get(key)
if (
    attempt is not None
    and state == TaskInstanceState.FAILED
    and self.pod_launch_failure_max_retries != 0
    and self._is_pre_execution_failure(...)
):
    ...

2. Docs spellcheckrequeues flagged in the cncf-kubernetes changelog. The singular requeue/requeued were already whitelisted; added the plural to docs/spelling_wordlist.txt (per devel-common/src/docs/README.rst, the wordlist is the right place for prose words — the backtick convention is only for class/function names).

Verified locally: ruff clean, prek hooks pass, 61 relevant unit tests pass (including the disabled-skips-lookup and dedupe cases).

Changes by Claude (Opus 4.8), reviewed by me.

self.task_queue is typed Queue | None; guard the requeue put() with a TYPE_CHECKING assert (matching the other call sites in this file) so mypy narrows it without tripping ruff's S101 on a runtime assert.
@seanmuth seanmuth force-pushed the feature/k8s-executor-pre-execution-requeue branch from 1a6b59c to 2af7e8a Compare July 1, 2026 21:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

KubernetesExecutor: automatically requeue tasks whose pod failed before execution started

3 participants