diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py index c160bb91d5975..c7c3071ed0c08 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -73,7 +73,18 @@ class CeleryKubernetesExecutor(BaseExecutor): def kubernetes_queue(self) -> str: return conf.get("celery_kubernetes_executor", "kubernetes_queue") - def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: KubernetesExecutor): + def __init__( + self, + celery_executor: CeleryExecutor | None = None, + kubernetes_executor: KubernetesExecutor | None = None, + ): + if AIRFLOW_V_3_0_PLUS or not kubernetes_executor or not celery_executor: + raise RuntimeError( + f"{self.__class__.__name__} does not support Airflow 3.0+. See " + "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently" + " how to use multiple executors concurrently." + ) + super().__init__() self._job_id: int | str | None = None self.celery_executor = celery_executor @@ -130,13 +141,6 @@ def job_id(self, value: int | str | None) -> None: def start(self) -> None: """Start celery and kubernetes executor.""" - if AIRFLOW_V_3_0_PLUS: - raise RuntimeError( - f"{self.__class__.__name__} does not support Airflow 3.0+. See " - "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently" - " how to use multiple executors concurrently." - ) - self.celery_executor.start() self.kubernetes_executor.start() diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index 6feb2299bb0a2..a05b8b63601aa 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -62,7 +62,18 @@ class LocalKubernetesExecutor(BaseExecutor): KUBERNETES_QUEUE = conf.get("local_kubernetes_executor", "kubernetes_queue") - def __init__(self, local_executor: LocalExecutor, kubernetes_executor: KubernetesExecutor): + def __init__( + self, + local_executor: LocalExecutor | None = None, + kubernetes_executor: KubernetesExecutor | None = None, + ): + if AIRFLOW_V_3_0_PLUS or not local_executor or not kubernetes_executor: + raise RuntimeError( + f"{self.__class__.__name__} does not support Airflow 3.0+. See " + "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently" + " how to use multiple executors concurrently." + ) + super().__init__() self._job_id: int | str | None = None self.local_executor = local_executor @@ -120,13 +131,6 @@ def job_id(self, value: int | str | None) -> None: def start(self) -> None: """Start local and kubernetes executor.""" - if AIRFLOW_V_3_0_PLUS: - raise RuntimeError( - f"{self.__class__.__name__} does not support Airflow 3.0+. See " - "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently" - " how to use multiple executors concurrently." - ) - self.log.info("Starting local and Kubernetes Executor") self.local_executor.start() self.kubernetes_executor.start()