From f0351e8478167c62a074ba5aab0ab60a43c764b5 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 17 May 2026 20:14:09 +0200 Subject: [PATCH] k8s tests: wait for push task in executor before killing scheduler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_integration_run_dag_with_scheduler_failure is intermittently flaky on ARM CI: the scheduler is killed immediately after start_job_in_kubernetes, so the new scheduler pod (after `kubectl rollout status` returns successfully) sometimes has to handle the very first scheduling step itself before the 40s monitor_task timeout expires. push stays in `queued` for the full 40s and the test fails with `assert 'queued' == 'success'`. Two adjustments: 1. Before killing the scheduler, wait until the `push` task instance has reached a `queued`-or-later state. That way the original scheduler has already handed the task to the executor and the post-restart scheduler only needs to drive the downstream dependency for `puller`, not pick up `push` from scratch. 2. Bump the post-restart monitor_task timeout from 40s to 120s. The previous "fail fast if failing" budget races with scheduler-loop warm-up under load; 120s is still fast for a successful run and gives a clear margin for the legitimate cases. This is a residual flake left after #46502 — the rollout-status wait fixed the worst of it, but the race between "pod is running" and "scheduler loop is actually scheduling" remained. Reopens the spirit of #45145. --- .../tests/kubernetes_tests/test_base.py | 26 +++++++++++++++++++ .../kubernetes_tests/test_other_executors.py | 23 +++++++++++++--- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/kubernetes-tests/tests/kubernetes_tests/test_base.py b/kubernetes-tests/tests/kubernetes_tests/test_base.py index bffc657415380..caa829467185a 100644 --- a/kubernetes-tests/tests/kubernetes_tests/test_base.py +++ b/kubernetes-tests/tests/kubernetes_tests/test_base.py @@ -242,6 +242,32 @@ def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state, print(f"The expected state is wrong {state} != {expected_final_state} (expected)!") assert state == expected_final_state + def wait_until_task_in_executor(self, host, dag_run_id, dag_id, task_id, timeout=60): + """Poll until the task instance has been handed to the executor. + + Once the state is ``queued`` (or any post-queued state), the scheduler + has already pushed the task to the executor queue, so a subsequent + scheduler crash does not race with the very first scheduling step. + """ + deadline = time.monotonic() + timeout + post_queued_states = {"queued", "running", "success", "failed", "upstream_failed", "removed"} + get_string = f"http://{host}/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}" + state: str | None = None + while time.monotonic() < deadline: + try: + result = self.session.get(get_string) + if result.status_code == 200: + state = result.json().get("state") + print(f"[wait_until_task_in_executor] {task_id} state={state}") + if state in post_queued_states: + return state + except requests.exceptions.ConnectionError as exc: + print(f"[wait_until_task_in_executor] api call failed, retrying. error={exc}") + time.sleep(2) + raise AssertionError( + f"task {task_id} did not reach a post-queued state within {timeout}s (last seen state: {state})" + ) + @staticmethod def ensure_resource_health( resource_name: str, diff --git a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py index f2efcce5bded8..a13e56437f7f6 100644 --- a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py +++ b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py @@ -54,6 +54,18 @@ def test_integration_run_dag_with_scheduler_failure(self): dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host) + # Make sure the first task has already been handed to the executor before + # we kill the scheduler. Otherwise the scheduler-kill races with the very + # first scheduling step, and the post-restart scheduler has to re-pick the + # task itself — making the post-restart monitor timeouts unreliable. + self.wait_until_task_in_executor( + host=self.host, + dag_run_id=dag_run_id, + dag_id=dag_id, + task_id="push", + timeout=60, + ) + self._delete_airflow_pod("scheduler") # Wait for the scheduler to be recreated @@ -65,14 +77,19 @@ def test_integration_run_dag_with_scheduler_failure(self): raise ValueError(f"Unknown executor {EXECUTOR}") self.ensure_resource_health("airflow-scheduler", resource_type=scheduler_resource_type) - # Wait some time for the operator to complete + # `push` is already in the executor at this point, but `kubectl rollout + # status` returns when the new scheduler pod is *running*, not when the + # scheduler loop has resumed processing. Give the worker / new scheduler + # enough headroom to drive push → success and then schedule the + # downstream puller. 40s used to be the "fail fast" budget — in + # practice that races with scheduler-loop warm-up. self.monitor_task( host=self.host, dag_run_id=dag_run_id, dag_id=dag_id, task_id="push", expected_final_state="success", - timeout=40, # This should fail fast if failing + timeout=120, ) self.monitor_task( @@ -81,7 +98,7 @@ def test_integration_run_dag_with_scheduler_failure(self): dag_id=dag_id, task_id="puller", expected_final_state="success", - timeout=40, + timeout=120, ) self.ensure_dag_expected_state(