diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 46f11e34ab80e..358862948bd2e 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -71,8 +71,7 @@ core: description: | The executor class that airflow should use. Choices include ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, - ``KubernetesExecutor``, ``CeleryKubernetesExecutor``, ``LocalKubernetesExecutor`` or the - full import path to the class when using a custom executor. + ``KubernetesExecutor`` or the full import path to the class when using a custom executor. version_added: ~ type: string example: ~ diff --git a/airflow/executors/executor_constants.py b/airflow/executors/executor_constants.py index 65d814f28ac8f..5d752e23233d3 100644 --- a/airflow/executors/executor_constants.py +++ b/airflow/executors/executor_constants.py @@ -28,19 +28,15 @@ class ConnectorSource(Enum): LOCAL_EXECUTOR = "LocalExecutor" -LOCAL_KUBERNETES_EXECUTOR = "LocalKubernetesExecutor" SEQUENTIAL_EXECUTOR = "SequentialExecutor" CELERY_EXECUTOR = "CeleryExecutor" -CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor" KUBERNETES_EXECUTOR = "KubernetesExecutor" DEBUG_EXECUTOR = "DebugExecutor" MOCK_EXECUTOR = "MockExecutor" CORE_EXECUTOR_NAMES = { LOCAL_EXECUTOR, - LOCAL_KUBERNETES_EXECUTOR, SEQUENTIAL_EXECUTOR, CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, KUBERNETES_EXECUTOR, DEBUG_EXECUTOR, MOCK_EXECUTOR, diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index 6d6b8d115bcc1..80fed5c727871 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -25,12 +25,10 @@ from airflow.exceptions import AirflowConfigException, UnknownExecutorException from airflow.executors.executor_constants import ( CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, CORE_EXECUTOR_NAMES, DEBUG_EXECUTOR, KUBERNETES_EXECUTOR, LOCAL_EXECUTOR, - LOCAL_KUBERNETES_EXECUTOR, SEQUENTIAL_EXECUTOR, ConnectorSource, ) @@ -59,12 +57,8 @@ class ExecutorLoader: executors = { LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor", - LOCAL_KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes." - "executors.local_kubernetes_executor.LocalKubernetesExecutor", SEQUENTIAL_EXECUTOR: "airflow.executors.sequential_executor.SequentialExecutor", CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor", - CELERY_KUBERNETES_EXECUTOR: "airflow.providers.celery." - "executors.celery_kubernetes_executor.CeleryKubernetesExecutor", KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes." "executors.kubernetes_executor.KubernetesExecutor", DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor", @@ -265,17 +259,12 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor _executor_name = executor_name try: - if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR: - executor = cls.__load_celery_kubernetes_executor() - elif _executor_name.alias == LOCAL_KUBERNETES_EXECUTOR: - executor = cls.__load_local_kubernetes_executor() + executor_cls, import_source = cls.import_executor_cls(_executor_name) + log.debug("Loading executor %s from %s", _executor_name, import_source.value) + if _executor_name.team_id: + executor = executor_cls(team_id=_executor_name.team_id) else: - executor_cls, import_source = cls.import_executor_cls(_executor_name) - log.debug("Loading executor %s from %s", _executor_name, import_source.value) - if _executor_name.team_id: - executor = executor_cls(team_id=_executor_name.team_id) - else: - executor = executor_cls() + executor = executor_cls() except ImportError as e: log.error(e) @@ -315,19 +304,3 @@ def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSourc executor_name = cls.get_default_executor_name() executor, source = cls.import_executor_cls(executor_name) return executor, source - - @classmethod - def __load_celery_kubernetes_executor(cls) -> BaseExecutor: - celery_executor = import_string(cls.executors[CELERY_EXECUTOR])() - kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() - - celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR]) - return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor) - - @classmethod - def __load_local_kubernetes_executor(cls) -> BaseExecutor: - local_executor = import_string(cls.executors[LOCAL_EXECUTOR])() - kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() - - local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR]) - return local_kubernetes_executor_cls(local_executor, kubernetes_executor) diff --git a/airflow/settings.py b/airflow/settings.py index fb9450ef741c2..307ee1e668a33 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -687,11 +687,7 @@ def initialize(): LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True) # Determines if the executor utilizes Kubernetes -IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in { - executor_constants.KUBERNETES_EXECUTOR, - executor_constants.CELERY_KUBERNETES_EXECUTOR, - executor_constants.LOCAL_KUBERNETES_EXECUTOR, -} +IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") == executor_constants.KUBERNETES_EXECUTOR # Executors can set this to true to configure logging correctly for # containerized executors. diff --git a/tests/cli/commands/local_commands/test_scheduler_command.py b/tests/cli/commands/local_commands/test_scheduler_command.py index 2dfce8edde6c6..8da375b2b1472 100644 --- a/tests/cli/commands/local_commands/test_scheduler_command.py +++ b/tests/cli/commands/local_commands/test_scheduler_command.py @@ -45,7 +45,6 @@ def setup_class(cls): ("LocalExecutor", True), ("SequentialExecutor", True), ("KubernetesExecutor", False), - ("LocalKubernetesExecutor", True), ], ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") diff --git a/tests/cli/commands/local_commands/test_standalone_command.py b/tests/cli/commands/local_commands/test_standalone_command.py index 464e0d3aec2f3..484596529f278 100644 --- a/tests/cli/commands/local_commands/test_standalone_command.py +++ b/tests/cli/commands/local_commands/test_standalone_command.py @@ -26,11 +26,9 @@ from airflow.executors import executor_loader from airflow.executors.executor_constants import ( CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, DEBUG_EXECUTOR, KUBERNETES_EXECUTOR, LOCAL_EXECUTOR, - LOCAL_KUBERNETES_EXECUTOR, SEQUENTIAL_EXECUTOR, ) @@ -40,17 +38,13 @@ class TestStandaloneCommand: "conf_executor_name, conf_sql_alchemy_conn, expected_standalone_executor", [ (LOCAL_EXECUTOR, "sqlite_conn_string", LOCAL_EXECUTOR), - (LOCAL_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), - (CELERY_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), - (LOCAL_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), (SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR), (CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), - (CELERY_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), (KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), (DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), ], diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index 17f891c94f88b..5c46e0aa01bb0 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -23,8 +23,8 @@ from airflow.executors import local_executor from airflow.models.dagbag import DagBag -from airflow.providers.celery.executors import celery_executor, celery_kubernetes_executor -from airflow.providers.cncf.kubernetes.executors import kubernetes_executor, local_kubernetes_executor +from airflow.providers.celery.executors import celery_executor +from airflow.providers.cncf.kubernetes.executors import kubernetes_executor from tests_common.test_utils.config import conf_vars @@ -33,15 +33,9 @@ custom_executor_module.CustomCeleryExecutor = type( # type: ignore "CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {} ) -custom_executor_module.CustomCeleryKubernetesExecutor = type( # type: ignore - "CustomCeleryKubernetesExecutor", (celery_kubernetes_executor.CeleryKubernetesExecutor,), {} -) custom_executor_module.CustomLocalExecutor = type( # type: ignore "CustomLocalExecutor", (local_executor.LocalExecutor,), {} ) -custom_executor_module.CustomLocalKubernetesExecutor = type( # type: ignore - "CustomLocalKubernetesExecutor", (local_kubernetes_executor.LocalKubernetesExecutor,), {} -) custom_executor_module.CustomKubernetesExecutor = type( # type: ignore "CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {} ) diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py index 3bfbaeb23bd1f..d50914c3adc87 100644 --- a/tests/cli/test_cli_parser.py +++ b/tests/cli/test_cli_parser.py @@ -365,16 +365,12 @@ def test_executor_specific_commands_not_accessible(self, command): "executor,expected_args", [ ("CeleryExecutor", ["celery"]), - ("CeleryKubernetesExecutor", ["celery", "kubernetes"]), ("KubernetesExecutor", ["kubernetes"]), ("LocalExecutor", []), - ("LocalKubernetesExecutor", ["kubernetes"]), ("SequentialExecutor", []), # custom executors are mapped to the regular ones in `conftest.py` ("custom_executor.CustomLocalExecutor", []), - ("custom_executor.CustomLocalKubernetesExecutor", ["kubernetes"]), ("custom_executor.CustomCeleryExecutor", ["celery"]), - ("custom_executor.CustomCeleryKubernetesExecutor", ["celery", "kubernetes"]), ("custom_executor.CustomKubernetesExecutor", ["kubernetes"]), ], ) diff --git a/tests/executors/test_executor_loader.py b/tests/executors/test_executor_loader.py index de6703954b10d..ebe5ae6c40975 100644 --- a/tests/executors/test_executor_loader.py +++ b/tests/executors/test_executor_loader.py @@ -45,7 +45,6 @@ def test_no_executor_configured(self): "executor_name", [ "CeleryExecutor", - "CeleryKubernetesExecutor", "DebugExecutor", "KubernetesExecutor", "LocalExecutor", @@ -287,7 +286,6 @@ def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self, ("executor_config", "expected_value"), [ ("CeleryExecutor", "CeleryExecutor"), - ("CeleryKubernetesExecutor", "CeleryKubernetesExecutor"), ("DebugExecutor", "DebugExecutor"), ("KubernetesExecutor", "KubernetesExecutor"), ("LocalExecutor", "LocalExecutor"), diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index 970960d7f1b2f..f7e1021ecb142 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -35,11 +35,9 @@ from airflow.executors.debug_executor import DebugExecutor from airflow.executors.executor_constants import ( CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, DEBUG_EXECUTOR, KUBERNETES_EXECUTOR, LOCAL_EXECUTOR, - LOCAL_KUBERNETES_EXECUTOR, SEQUENTIAL_EXECUTOR, ) from airflow.executors.local_executor import LocalExecutor @@ -48,9 +46,7 @@ from airflow.models.trigger import TriggerFailureReason from airflow.models.xcom import XCom from airflow.providers.celery.executors.celery_executor import CeleryExecutor -from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor -from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import LocalKubernetesExecutor from airflow.providers.standard.operators.empty import EmptyOperator from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, poke_mode_only from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep @@ -1306,20 +1302,16 @@ def test_sensor_with_xcom_fails(self, make_sensor): "executor_cls_mode", [ (CeleryExecutor, "poke"), - (CeleryKubernetesExecutor, "poke"), (DebugExecutor, "reschedule"), (KubernetesExecutor, "poke"), (LocalExecutor, "poke"), - (LocalKubernetesExecutor, "poke"), (SequentialExecutor, "poke"), ], ids=[ CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, DEBUG_EXECUTOR, KUBERNETES_EXECUTOR, LOCAL_EXECUTOR, - LOCAL_KUBERNETES_EXECUTOR, SEQUENTIAL_EXECUTOR, ], ) diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index f1245b863636b..80a20bf66e465 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -199,21 +199,13 @@ def task_callable(ti): @pytest.mark.parametrize( "executor_name", [ - (executor_constants.LOCAL_KUBERNETES_EXECUTOR), - (executor_constants.CELERY_KUBERNETES_EXECUTOR), (executor_constants.KUBERNETES_EXECUTOR), (None), ], ) @conf_vars( { - ("core", "EXECUTOR"): ",".join( - [ - executor_constants.LOCAL_KUBERNETES_EXECUTOR, - executor_constants.CELERY_KUBERNETES_EXECUTOR, - executor_constants.KUBERNETES_EXECUTOR, - ] - ), + ("core", "EXECUTOR"): executor_constants.KUBERNETES_EXECUTOR, } ) @patch(