diff --git a/kubernetes-tests/tests/kubernetes_tests/test_base.py b/kubernetes-tests/tests/kubernetes_tests/test_base.py index d453d94e26605..44fdd3b29e245 100644 --- a/kubernetes-tests/tests/kubernetes_tests/test_base.py +++ b/kubernetes-tests/tests/kubernetes_tests/test_base.py @@ -24,6 +24,7 @@ from datetime import datetime, timezone from pathlib import Path from subprocess import check_call, check_output +from typing import Literal import pytest import requests @@ -65,7 +66,7 @@ def base_tests_setup(self, request): # Replacement for unittests.TestCase.id() self.test_id = f"{request.node.cls.__name__}_{request.node.name}" # Ensure the api-server deployment is healthy at kubernetes level before calling the any API - self.ensure_deployment_health("airflow-api-server") + self.ensure_resource_health("airflow-api-server") try: self.session = self._get_session_with_retries() self._ensure_airflow_api_server_is_healthy() @@ -227,12 +228,24 @@ def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state, assert state == expected_final_state @staticmethod - def ensure_deployment_health(deployment_name: str, namespace: str = "airflow"): - """Watch the deployment until it is healthy.""" - deployment_rollout_status = check_output( - ["kubectl", "rollout", "status", "deployment", deployment_name, "-n", namespace, "--watch"] + def ensure_resource_health( + resource_name: str, + namespace: str = "airflow", + resource_type: Literal["deployment", "statefulset"] = "deployment", + ): + """Watch the resource until it is healthy. + Args: + resource_name (str): Name of the resource to check. + resource_type (str): Type of the resource (e.g., deployment, statefulset). + namespace (str): Kubernetes namespace where the resource is located. + """ + rollout_status = check_output( + ["kubectl", "rollout", "status", f"{resource_type}/{resource_name}", "-n", namespace, "--watch"], ).decode() - assert "successfully rolled out" in deployment_rollout_status + if resource_type == "deployment": + assert "successfully rolled out" in rollout_status + else: + assert "roll out complete" in rollout_status def ensure_dag_expected_state(self, host, logical_date, dag_id, expected_final_state, timeout): tries = 0 diff --git a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py index b3898f460274a..c7e1322bc7707 100644 --- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py +++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py @@ -81,7 +81,7 @@ def test_integration_run_dag_with_scheduler_failure(self): dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host) self._delete_airflow_pod("scheduler") - self.ensure_deployment_health("airflow-scheduler") + self.ensure_resource_health("airflow-scheduler") # Wait some time for the operator to complete self.monitor_task( diff --git a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py index 327e252825a37..7b49bf9f5639b 100644 --- a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py +++ b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py @@ -29,10 +29,6 @@ # Also, the skipping is necessary as there's no gain in running these tests in KubernetesExecutor @pytest.mark.skipif(EXECUTOR == "KubernetesExecutor", reason="Does not run on KubernetesExecutor") class TestCeleryAndLocalExecutor(BaseK8STest): - @pytest.mark.xfail( - EXECUTOR == "LocalExecutor", - reason="https://github.com/apache/airflow/issues/47518 needs to be fixed", - ) def test_integration_run_dag(self): dag_id = "example_bash_operator" dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host) @@ -56,17 +52,21 @@ def test_integration_run_dag(self): timeout=300, ) - @pytest.mark.xfail( - EXECUTOR == "LocalExecutor", - reason="https://github.com/apache/airflow/issues/47518 needs to be fixed", - ) def test_integration_run_dag_with_scheduler_failure(self): dag_id = "example_xcom" dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host) self._delete_airflow_pod("scheduler") - self.ensure_deployment_health("airflow-scheduler") + + # Wait for the scheduler to be recreated + if EXECUTOR == "CeleryExecutor": + scheduler_resource_type = "deployment" + elif EXECUTOR == "LocalExecutor": + scheduler_resource_type = "statefulset" + else: + raise ValueError(f"Unknown executor {EXECUTOR}") + self.ensure_resource_health("airflow-scheduler", resource_type=scheduler_resource_type) # Wait some time for the operator to complete self.monitor_task(