Skip to content

KubernetesExecutor: self.completed adoption set is never drained#68674

Merged
kaxil merged 3 commits into
apache:mainfrom
ihorlukianov:main-pod-deletion-fix
Jun 30, 2026
Merged

KubernetesExecutor: self.completed adoption set is never drained#68674
kaxil merged 3 commits into
apache:mainfrom
ihorlukianov:main-pod-deletion-fix

Conversation

@ihorlukianov

@ihorlukianov ihorlukianov commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Solves #68683

Airflow version observed 3.2.1

KubernetesExecutor.sync() re-runs _change_state() over the entire self.completed set, and nothing ever removes entries from that set.
Iteration over the self.completed is nested inside the result-queue while True

With delete_worker_pods=False, the data structure only grows, with no removals. So every adopted completed pod is re-PATCHed forever, and the set grows monotonically over the scheduler's lifetime.
With delete_worker_pods=True, the same happens with pod deletion.

The same pod name is deleted many times within seconds:

2026-06-11T18:49:36.108968Z  Deleting pod trigger-test-dags-trigger-zwyr4icn ...
2026-06-11T18:49:36.228925Z  Deleting pod trigger-test-dags-trigger-zwyr4icn ...
2026-06-11T18:49:36.397320Z  Deleting pod trigger-test-dags-trigger-zwyr4icn ...
... (166 total for this one pod)

This starves the scheduler loop: with (in my case ~1,855) scheduled TIs waiting, most of each loop is spent re-deleting finished pods instead of launching new ones.

Expected behaviour

Each finished worker pod should be patched or deleted once. Subsequent scheduler loops should not re-issue calls for pods already processed.

Reproduce

  1. Deploy Airflow 3.2.x with KubernetesExecutor.

  2. Trigger DAGs with many mapped tasks (e.g. 100+ mapped trigger tasks) so pods complete in quick succession.

  3. Inspect scheduler logs:

    kubectl logs <scheduler-pod> -c scheduler | grep "Deleting pod" \
      | sed 's/.*Deleting pod \([^ ]*\) in.*/\1/' | sort | uniq -c | sort -rn | head
  4. Observe the same pod names with delete counts >> 1.

RCA

Introduced/regressed around #55797 (Oct 2025), which added a self.completed set for orphaned completed pod adoption.
In KubernetesExecutor.sync() (providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py):

  1. self.completed is processed inside the while True loop that drains result_queue — for every completion event, all entries in self.completed call _change_state() again, each triggering delete_pod():

    while True:
        results = self.result_queue.get_nowait()
        ...
        self._change_state(results)
    
        for result in self.completed:   # <-- nested inside while True
            self._change_state(result)
  2. self.completed is never cleared after processing, so entries accumulate and are re-processed on every subsequent completion event.

Expected delete volume ≈ num_result_queue_events × (1 + len(self.completed))

Proposed fix

  1. Move for result in self.completed outside the result-queue drain loop (once per sync()).
  2. Clear or discard from self.completed after successful processing.
  3. Deduplicate by pod_name when adopting completed pods.

Expected impact

Before: delete_calls ≈ num_result_events × (1 + len(completed)) per sync().
After: delete_calls ≈ num_result_events + len(completed) per sync(), with completed cleared after processing.


Was generative AI tooling used to co-author this PR?

@FrankYang0529 FrankYang0529 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave some minor comments. You can link issue to #68683.

@ihorlukianov ihorlukianov changed the title KubernetesExecutor repeatedly deletes the same finished worker pods every scheduler loop KubernetesExecutor: self.completed adoption set is never drained Jun 18, 2026
@FrankYang0529

Copy link
Copy Markdown
Member

@ihorlukianov Could you check the CI error? Thank you.

diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 8c7aad5..c82b97a 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1561,12 +1561,8 @@ class TestKubernetesExecutor:
                     None,
                 )
             }
-            executor.result_queue.put(
-                KubernetesResults(queue_key, None, "queue-pod", "default", "2", None)
-            )
-            executor.result_queue.put(
-                KubernetesResults(queue_key, None, "queue-pod-2", "default", "3", None)
-            )
+            executor.result_queue.put(KubernetesResults(queue_key, None, "queue-pod", "default", "2", None))
+            executor.result_queue.put(KubernetesResults(queue_key, None, "queue-pod-2", "default", "3", None))

@seanmuth

Copy link
Copy Markdown
Contributor

Independently validated this fix on a live Astro KubernetesExecutor deployment (Airflow 3.2.2, astronomer-kubernetes-executor 10.18.0 — which vendors the cncf executor; its sync() / self.completed path is byte-identical to cncf-kubernetes 10.18.1).

Method

self.completed is only populated by _adopt_completed_pods, which adopts status.phase=Succeeded pods that aren't yet marked done and belong to a dead scheduler (the selector excludes the current scheduler and alive siblings). A single steady scheduler never adopts its own completing pods, so the bug doesn't surface under normal operation — it needs orphaned Succeeded-not-done pods.

To trigger it deterministically:

  1. AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS=False so completed pods linger as Succeeded.
  2. Trigger a DAG with 100 mapped tasks (one worker pod each).
  3. ~45s in, restart the scheduler mid-burst. The restart gap leaves a pile of Succeeded-not-done pods that the new scheduler adopts into self.completed.

Measured Patched pod <key> ... to mark it as done occurrences per pod (map_index) in scheduler logs.

Results (identical trigger, only the executor code differs)

adoption fired pods total patch calls max per pod avg run outcome
Unpatched (10.18.0) yes (10 adopt attempts) 100 300 21 3.0 failed
Patched (this PR) yes (24 adopt attempts) 100 124 2 1.24 success

Without the fix, a single adopted pod was re-patched up to 21 times and the count was still climbing (self.completed grows unbounded → scheduler-loop starvation). With the fix, despite the patched run actually adopting more pods (24 vs 10 adoption attempts), re-processing is bounded to ~once per pod (max 2, avg 1.24) because self.completed is drained after each entry is handled. The patched run also recovered through the scheduler restart and completed successfully, whereas the unpatched run failed.

LGTM — the self.completed dict + drain (self.completed = still_pending) behaves as intended on a real deployment.


Drafted-by: Claude Code (Opus 4.8); reviewed by @seanmuth before posting

@kaxil kaxil merged commit 25c0b3f into apache:main Jun 30, 2026
102 checks passed
karenbraganz pushed a commit to karenbraganz/airflow that referenced this pull request Jun 30, 2026
…che#68674)

* Fix Kubernetes Executor pods deletion storm

* Used dict for better performance; Added UT for delete_worker_pods=False

* Fix formatting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants