From 5b0b60def18a3b91a2846897b449b07cfe01bf49 Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Mon, 3 Mar 2025 15:58:02 -0800 Subject: [PATCH 1/2] Remove core Airflow support for static hybrid executors Remove all the handholding and custom logic we have in core airflow which allows the use of static hybrid executors like LocalKubernetesExecutor and CeleryKubernetesExecutor. These executors will still work on 2.X versions of Airflow, but moving forward they will not be supported on Airflow 3 --- airflow/config_templates/config.yml | 3 +- airflow/executors/executor_constants.py | 4 -- airflow/executors/executor_loader.py | 37 +++---------------- airflow/settings.py | 6 +-- .../local_commands/test_scheduler_command.py | 1 - .../local_commands/test_standalone_command.py | 6 --- tests/cli/conftest.py | 10 +---- tests/cli/test_cli_parser.py | 4 -- tests/executors/test_executor_loader.py | 2 - tests/sensors/test_base.py | 8 ---- tests/utils/test_log_handlers.py | 4 -- 11 files changed, 9 insertions(+), 76 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 6a57fc709874a..be33e234a7e0a 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -71,8 +71,7 @@ core: description: | The executor class that airflow should use. Choices include ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, - ``KubernetesExecutor``, ``CeleryKubernetesExecutor``, ``LocalKubernetesExecutor`` or the - full import path to the class when using a custom executor. + ``KubernetesExecutor`` or the full import path to the class when using a custom executor. version_added: ~ type: string example: ~ diff --git a/airflow/executors/executor_constants.py b/airflow/executors/executor_constants.py index 65d814f28ac8f..5d752e23233d3 100644 --- a/airflow/executors/executor_constants.py +++ b/airflow/executors/executor_constants.py @@ -28,19 +28,15 @@ class ConnectorSource(Enum): LOCAL_EXECUTOR = "LocalExecutor" -LOCAL_KUBERNETES_EXECUTOR = "LocalKubernetesExecutor" SEQUENTIAL_EXECUTOR = "SequentialExecutor" CELERY_EXECUTOR = "CeleryExecutor" -CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor" KUBERNETES_EXECUTOR = "KubernetesExecutor" DEBUG_EXECUTOR = "DebugExecutor" MOCK_EXECUTOR = "MockExecutor" CORE_EXECUTOR_NAMES = { LOCAL_EXECUTOR, - LOCAL_KUBERNETES_EXECUTOR, SEQUENTIAL_EXECUTOR, CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, KUBERNETES_EXECUTOR, DEBUG_EXECUTOR, MOCK_EXECUTOR, diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index 6d6b8d115bcc1..80fed5c727871 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -25,12 +25,10 @@ from airflow.exceptions import AirflowConfigException, UnknownExecutorException from airflow.executors.executor_constants import ( CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, CORE_EXECUTOR_NAMES, DEBUG_EXECUTOR, KUBERNETES_EXECUTOR, LOCAL_EXECUTOR, - LOCAL_KUBERNETES_EXECUTOR, SEQUENTIAL_EXECUTOR, ConnectorSource, ) @@ -59,12 +57,8 @@ class ExecutorLoader: executors = { LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor", - LOCAL_KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes." - "executors.local_kubernetes_executor.LocalKubernetesExecutor", SEQUENTIAL_EXECUTOR: "airflow.executors.sequential_executor.SequentialExecutor", CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor", - CELERY_KUBERNETES_EXECUTOR: "airflow.providers.celery." - "executors.celery_kubernetes_executor.CeleryKubernetesExecutor", KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes." "executors.kubernetes_executor.KubernetesExecutor", DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor", @@ -265,17 +259,12 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor _executor_name = executor_name try: - if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR: - executor = cls.__load_celery_kubernetes_executor() - elif _executor_name.alias == LOCAL_KUBERNETES_EXECUTOR: - executor = cls.__load_local_kubernetes_executor() + executor_cls, import_source = cls.import_executor_cls(_executor_name) + log.debug("Loading executor %s from %s", _executor_name, import_source.value) + if _executor_name.team_id: + executor = executor_cls(team_id=_executor_name.team_id) else: - executor_cls, import_source = cls.import_executor_cls(_executor_name) - log.debug("Loading executor %s from %s", _executor_name, import_source.value) - if _executor_name.team_id: - executor = executor_cls(team_id=_executor_name.team_id) - else: - executor = executor_cls() + executor = executor_cls() except ImportError as e: log.error(e) @@ -315,19 +304,3 @@ def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSourc executor_name = cls.get_default_executor_name() executor, source = cls.import_executor_cls(executor_name) return executor, source - - @classmethod - def __load_celery_kubernetes_executor(cls) -> BaseExecutor: - celery_executor = import_string(cls.executors[CELERY_EXECUTOR])() - kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() - - celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR]) - return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor) - - @classmethod - def __load_local_kubernetes_executor(cls) -> BaseExecutor: - local_executor = import_string(cls.executors[LOCAL_EXECUTOR])() - kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() - - local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR]) - return local_kubernetes_executor_cls(local_executor, kubernetes_executor) diff --git a/airflow/settings.py b/airflow/settings.py index fb9450ef741c2..307ee1e668a33 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -687,11 +687,7 @@ def initialize(): LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True) # Determines if the executor utilizes Kubernetes -IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in { - executor_constants.KUBERNETES_EXECUTOR, - executor_constants.CELERY_KUBERNETES_EXECUTOR, - executor_constants.LOCAL_KUBERNETES_EXECUTOR, -} +IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") == executor_constants.KUBERNETES_EXECUTOR # Executors can set this to true to configure logging correctly for # containerized executors. diff --git a/tests/cli/commands/local_commands/test_scheduler_command.py b/tests/cli/commands/local_commands/test_scheduler_command.py index 2dfce8edde6c6..8da375b2b1472 100644 --- a/tests/cli/commands/local_commands/test_scheduler_command.py +++ b/tests/cli/commands/local_commands/test_scheduler_command.py @@ -45,7 +45,6 @@ def setup_class(cls): ("LocalExecutor", True), ("SequentialExecutor", True), ("KubernetesExecutor", False), - ("LocalKubernetesExecutor", True), ], ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") diff --git a/tests/cli/commands/local_commands/test_standalone_command.py b/tests/cli/commands/local_commands/test_standalone_command.py index 464e0d3aec2f3..484596529f278 100644 --- a/tests/cli/commands/local_commands/test_standalone_command.py +++ b/tests/cli/commands/local_commands/test_standalone_command.py @@ -26,11 +26,9 @@ from airflow.executors import executor_loader from airflow.executors.executor_constants import ( CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, DEBUG_EXECUTOR, KUBERNETES_EXECUTOR, LOCAL_EXECUTOR, - LOCAL_KUBERNETES_EXECUTOR, SEQUENTIAL_EXECUTOR, ) @@ -40,17 +38,13 @@ class TestStandaloneCommand: "conf_executor_name, conf_sql_alchemy_conn, expected_standalone_executor", [ (LOCAL_EXECUTOR, "sqlite_conn_string", LOCAL_EXECUTOR), - (LOCAL_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), - (CELERY_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), - (LOCAL_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), (SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR), (CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), - (CELERY_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), (KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), (DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), ], diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index 17f891c94f88b..5c46e0aa01bb0 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -23,8 +23,8 @@ from airflow.executors import local_executor from airflow.models.dagbag import DagBag -from airflow.providers.celery.executors import celery_executor, celery_kubernetes_executor -from airflow.providers.cncf.kubernetes.executors import kubernetes_executor, local_kubernetes_executor +from airflow.providers.celery.executors import celery_executor +from airflow.providers.cncf.kubernetes.executors import kubernetes_executor from tests_common.test_utils.config import conf_vars @@ -33,15 +33,9 @@ custom_executor_module.CustomCeleryExecutor = type( # type: ignore "CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {} ) -custom_executor_module.CustomCeleryKubernetesExecutor = type( # type: ignore - "CustomCeleryKubernetesExecutor", (celery_kubernetes_executor.CeleryKubernetesExecutor,), {} -) custom_executor_module.CustomLocalExecutor = type( # type: ignore "CustomLocalExecutor", (local_executor.LocalExecutor,), {} ) -custom_executor_module.CustomLocalKubernetesExecutor = type( # type: ignore - "CustomLocalKubernetesExecutor", (local_kubernetes_executor.LocalKubernetesExecutor,), {} -) custom_executor_module.CustomKubernetesExecutor = type( # type: ignore "CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {} ) diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py index 3bfbaeb23bd1f..d50914c3adc87 100644 --- a/tests/cli/test_cli_parser.py +++ b/tests/cli/test_cli_parser.py @@ -365,16 +365,12 @@ def test_executor_specific_commands_not_accessible(self, command): "executor,expected_args", [ ("CeleryExecutor", ["celery"]), - ("CeleryKubernetesExecutor", ["celery", "kubernetes"]), ("KubernetesExecutor", ["kubernetes"]), ("LocalExecutor", []), - ("LocalKubernetesExecutor", ["kubernetes"]), ("SequentialExecutor", []), # custom executors are mapped to the regular ones in `conftest.py` ("custom_executor.CustomLocalExecutor", []), - ("custom_executor.CustomLocalKubernetesExecutor", ["kubernetes"]), ("custom_executor.CustomCeleryExecutor", ["celery"]), - ("custom_executor.CustomCeleryKubernetesExecutor", ["celery", "kubernetes"]), ("custom_executor.CustomKubernetesExecutor", ["kubernetes"]), ], ) diff --git a/tests/executors/test_executor_loader.py b/tests/executors/test_executor_loader.py index de6703954b10d..ebe5ae6c40975 100644 --- a/tests/executors/test_executor_loader.py +++ b/tests/executors/test_executor_loader.py @@ -45,7 +45,6 @@ def test_no_executor_configured(self): "executor_name", [ "CeleryExecutor", - "CeleryKubernetesExecutor", "DebugExecutor", "KubernetesExecutor", "LocalExecutor", @@ -287,7 +286,6 @@ def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self, ("executor_config", "expected_value"), [ ("CeleryExecutor", "CeleryExecutor"), - ("CeleryKubernetesExecutor", "CeleryKubernetesExecutor"), ("DebugExecutor", "DebugExecutor"), ("KubernetesExecutor", "KubernetesExecutor"), ("LocalExecutor", "LocalExecutor"), diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index 970960d7f1b2f..f7e1021ecb142 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -35,11 +35,9 @@ from airflow.executors.debug_executor import DebugExecutor from airflow.executors.executor_constants import ( CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, DEBUG_EXECUTOR, KUBERNETES_EXECUTOR, LOCAL_EXECUTOR, - LOCAL_KUBERNETES_EXECUTOR, SEQUENTIAL_EXECUTOR, ) from airflow.executors.local_executor import LocalExecutor @@ -48,9 +46,7 @@ from airflow.models.trigger import TriggerFailureReason from airflow.models.xcom import XCom from airflow.providers.celery.executors.celery_executor import CeleryExecutor -from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor -from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import LocalKubernetesExecutor from airflow.providers.standard.operators.empty import EmptyOperator from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, poke_mode_only from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep @@ -1306,20 +1302,16 @@ def test_sensor_with_xcom_fails(self, make_sensor): "executor_cls_mode", [ (CeleryExecutor, "poke"), - (CeleryKubernetesExecutor, "poke"), (DebugExecutor, "reschedule"), (KubernetesExecutor, "poke"), (LocalExecutor, "poke"), - (LocalKubernetesExecutor, "poke"), (SequentialExecutor, "poke"), ], ids=[ CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, DEBUG_EXECUTOR, KUBERNETES_EXECUTOR, LOCAL_EXECUTOR, - LOCAL_KUBERNETES_EXECUTOR, SEQUENTIAL_EXECUTOR, ], ) diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index f1245b863636b..fc739f01a0349 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -199,8 +199,6 @@ def task_callable(ti): @pytest.mark.parametrize( "executor_name", [ - (executor_constants.LOCAL_KUBERNETES_EXECUTOR), - (executor_constants.CELERY_KUBERNETES_EXECUTOR), (executor_constants.KUBERNETES_EXECUTOR), (None), ], @@ -209,8 +207,6 @@ def task_callable(ti): { ("core", "EXECUTOR"): ",".join( [ - executor_constants.LOCAL_KUBERNETES_EXECUTOR, - executor_constants.CELERY_KUBERNETES_EXECUTOR, executor_constants.KUBERNETES_EXECUTOR, ] ), From a1e9e9052bc68e01b979af9766118361995896fb Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Wed, 5 Mar 2025 14:57:41 -0800 Subject: [PATCH 2/2] Reformat test --- tests/utils/test_log_handlers.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index fc739f01a0349..80a20bf66e465 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -205,11 +205,7 @@ def task_callable(ti): ) @conf_vars( { - ("core", "EXECUTOR"): ",".join( - [ - executor_constants.KUBERNETES_EXECUTOR, - ] - ), + ("core", "EXECUTOR"): executor_constants.KUBERNETES_EXECUTOR, } ) @patch(