From 67a1667724306c3ce6a55cc58646b55ee96453e2 Mon Sep 17 00:00:00 2001 From: shubhamraj-git Date: Mon, 25 May 2026 06:01:03 +0000 Subject: [PATCH] Fix flaky OTel integration tests by bounding scheduler shutdown wait Replace unbounded subprocess.wait() calls with a 30-second grace period followed by SIGKILL, and raise execution_timeout from 90s to 160s on all three OTel integration test methods. The 90s limit did not account for the fixed 10s startup sleep, 10s post-run sleep, and OTel atexit metric flush (up to 10s via force_flush), leaving no budget for shutdown on slow CI runs. The new 160s ceiling covers the 140s worst-case path with a 20s buffer. --- .../tests/integration/otel/test_otel.py | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/airflow-core/tests/integration/otel/test_otel.py b/airflow-core/tests/integration/otel/test_otel.py index 05c7ea5638ef7..a6af896b4374b 100644 --- a/airflow-core/tests/integration/otel/test_otel.py +++ b/airflow-core/tests/integration/otel/test_otel.py @@ -357,17 +357,13 @@ def dag_execution_for_testing_metrics(self, capfd): finally: # Terminate the processes. - scheduler_process.terminate() - scheduler_process.wait() - + self._terminate_process(scheduler_process) scheduler_status = scheduler_process.poll() assert scheduler_status is not None, ( "The scheduler_1 process status is None, which means that it hasn't terminated as expected." ) - apiserver_process.terminate() - apiserver_process.wait() - + self._terminate_process(apiserver_process) apiserver_status = apiserver_process.poll() assert apiserver_status is not None, ( "The apiserver process status is None, which means that it hasn't terminated as expected." @@ -387,7 +383,8 @@ def _get_ti(self, dag_id: str, run_id: str, task_id: str) -> Any | None: ) return ti - @pytest.mark.execution_timeout(90) + # 160s = 10s startup + 90s dag-run wait + 10s post-run sleep + 30s shutdown grace + 20s CI buffer + @pytest.mark.execution_timeout(160) @pytest.mark.parametrize( ("legacy_names_on_bool", "legacy_names_exported"), [ @@ -423,7 +420,7 @@ def test_export_legacy_metric_names(self, legacy_names_on_bool, legacy_names_exp if legacy_names_exported: assert set(legacy_metric_names).issubset(metrics_dict.keys()) - @pytest.mark.execution_timeout(90) + @pytest.mark.execution_timeout(160) def test_export_metrics_during_process_shutdown(self, capfd): out, dag = self.dag_execution_for_testing_metrics(capfd) @@ -442,7 +439,7 @@ def test_export_metrics_during_process_shutdown(self, capfd): assert set(metrics_to_check).issubset(metrics_dict.keys()) - @pytest.mark.execution_timeout(90) + @pytest.mark.execution_timeout(160) @pytest.mark.parametrize( ("task_span_detail_level", "expected_hierarchy"), [ @@ -520,17 +517,13 @@ def test_dag_execution_succeeds(self, capfd, task_span_detail_level, expected_hi dump_airflow_metadata_db(session) # Terminate the processes. - scheduler_process.terminate() - scheduler_process.wait() - + self._terminate_process(scheduler_process) scheduler_status = scheduler_process.poll() assert scheduler_status is not None, ( "The scheduler_1 process status is None, which means that it hasn't terminated as expected." ) - apiserver_process.terminate() - apiserver_process.wait() - + self._terminate_process(apiserver_process) apiserver_status = apiserver_process.poll() assert apiserver_status is not None, ( "The apiserver process status is None, which means that it hasn't terminated as expected." @@ -572,6 +565,17 @@ def get_parent_span_id(span): nested = get_span_hierarchy() assert nested == expected_hierarchy + @staticmethod + def _terminate_process(proc: subprocess.Popen, timeout: int = 30) -> None: + # Grace period covers OTel atexit flush (force_flush default: 10s); + # SIGKILL is the fallback if the process is still alive after timeout. + proc.terminate() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + def start_scheduler(self, capture_output: bool = False): stdout = None if capture_output else subprocess.DEVNULL stderr = None if capture_output else subprocess.DEVNULL