From 1e9a18eae9577f1cd65f2de86e79875f76677ea4 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 19 Jun 2026 12:15:46 +0100 Subject: [PATCH] Fix KubernetesExecutor leaking a Manager process when reading running task logs The API server constructs a KubernetesExecutor solely to call get_task_log() for RUNNING tasks (via FileTaskHandler -> ExecutorLoader.get_default_executor) and never starts or ends it. KubernetesExecutor.__init__ eagerly created a multiprocessing.Manager(), which forks a serve_forever child process. Because that instance is cached per process and never shut down, the Manager child is orphaned -- one leaked process (~350-400 MB resident) per API-server worker, growing with worker recycling and pushing the API server toward OOM. get_task_log() only needs the kube client and pod namespace; it never touches the task/result queues. Create the Manager and its queues lazily in start() (the scheduling loop is their only consumer), mirroring how LocalExecutor already defers process/queue creation. end() now no-ops when the executor was never started. Constructing the executor for log reading no longer spawns a Manager. --- .../executors/kubernetes_executor.py | 25 +++++- .../executors/test_kubernetes_executor.py | 90 ++++++++++++++++++- 2 files changed, 108 insertions(+), 7 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 0dcc01537cb78..c9d206fc68cef 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -59,6 +59,7 @@ if TYPE_CHECKING: from collections.abc import Sequence + from multiprocessing.managers import SyncManager from kubernetes import client from kubernetes.client import models as k8s @@ -101,9 +102,14 @@ def __init__(self, *args, **kwargs): # Override parallelism with team-aware config value self.parallelism = self.kube_config.parallelism - self._manager = multiprocessing.Manager() - self.task_queue: Queue[KubernetesJob] = self._manager.JoinableQueue() - self.result_queue: Queue[KubernetesResults] = self._manager.JoinableQueue() + # The multiprocessing.Manager() (and the queues it backs) is only needed once the + # scheduler actually runs the executor, so it is created lazily in start(). Constructing + # the executor without starting it -- as the API server does to call get_task_log() for a + # RUNNING task -- must not spawn a Manager process, otherwise that serve_forever child is + # orphaned and leaks (one per API-server worker). + self._manager: SyncManager | None = None + self.task_queue: Queue[KubernetesJob] | None = None + self.result_queue: Queue[KubernetesResults] | None = None self.kube_scheduler: AirflowKubernetesScheduler | None = None self.kube_client: client.CoreV1Api | None = None self.scheduler_job_id: str | None = None @@ -184,6 +190,9 @@ def get_pod_combined_search_str_to_pod_map(self) -> dict[str, k8s.V1Pod]: def start(self) -> None: """Start the executor.""" self.log.info("Start Kubernetes executor") + self._manager = multiprocessing.Manager() + self.task_queue = self._manager.JoinableQueue() + self.result_queue = self._manager.JoinableQueue() self.scheduler_job_id = str(self.job_id) self.log.debug("Start with scheduler_job_id: %s", self.scheduler_job_id) from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ( @@ -866,10 +875,15 @@ def _flush_result_queue(self) -> None: def end(self) -> None: """Shut down the executor.""" + if self._manager is None: + # start() was never called (e.g. the executor was only constructed to read task + # logs), so there is no Manager process or queues to shut down. + return if TYPE_CHECKING: assert self.task_queue assert self.result_queue assert self.kube_scheduler + assert self._manager self.log.info("Shutting down Kubernetes executor") try: @@ -890,6 +904,11 @@ def end(self) -> None: except Exception: self.log.exception("Unknown error while flushing task queue and result queue.") self._manager.shutdown() + # Return to the unstarted state so a second end() is a no-op (the guard above) and the + # Manager/queues are recreated cleanly if start() is ever called again. + self._manager = None + self.task_queue = None + self.result_queue = None def terminate(self): """Terminate the executor is not doing anything.""" diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index bc1c2a97f55c7..4355b7f07fa4b 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -582,6 +582,8 @@ def test_run_next_exception_requeue( kubernetes_executor = self.kubernetes_executor kubernetes_executor.task_publish_max_retries = task_publish_max_retries kubernetes_executor.start() + task_queue = kubernetes_executor.task_queue + assert task_queue is not None try: # Execute a task while the Api Throws errors try_number = 1 @@ -596,7 +598,7 @@ def test_run_next_exception_requeue( assert mock_kube_client.create_namespaced_pod.call_count == 1 # type: ignore[attr-defined] if should_requeue: - assert not kubernetes_executor.task_queue.empty() + assert not task_queue.empty() # Disable the ApiException if task_expected_state == State.SUCCESS or task_expected_state == State.QUEUED: @@ -610,11 +612,11 @@ def test_run_next_exception_requeue( kubernetes_executor.sync() assert mock_kube_client.create_namespaced_pod.called # type: ignore[attr-defined] - assert kubernetes_executor.task_queue.empty() + assert task_queue.empty() if task_expected_state != State.SUCCESS: assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state else: - assert kubernetes_executor.task_queue.empty() + assert task_queue.empty() assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state finally: kubernetes_executor.end() @@ -1726,6 +1728,72 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat "Reading from k8s pod logs failed: error_fetching_pod_log", ] + def test_init_does_not_create_manager_process(self): + """ + Constructing the executor must not spawn a ``multiprocessing.Manager``. + + The API server builds a ``KubernetesExecutor`` purely to call ``get_task_log()`` for + RUNNING tasks and never starts it. Eagerly creating the Manager in ``__init__`` leaked an + orphaned ``serve_forever`` process per API-server worker. + """ + executor = KubernetesExecutor() + + assert executor._manager is None + assert executor.task_queue is None + assert executor.result_queue is None + + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_get_task_log_does_not_create_manager_process( + self, mock_get_kube_client, create_task_instance_of_operator + ): + """Reading a running task's logs must not spawn a ``multiprocessing.Manager``.""" + mock_kube_client = mock_get_kube_client.return_value + mock_kube_client.read_namespaced_pod_log.return_value = [b"a_"] + mock_pod = mock.Mock(spec=k8s.V1Pod) + mock_pod.metadata.name = "x" + mock_kube_client.list_namespaced_pod.return_value.items = [mock_pod] + ti = create_task_instance_of_operator(EmptyOperator, dag_id="test_k8s_log_dag", task_id="test_task") + + executor = KubernetesExecutor() + executor.get_task_log(ti=ti, try_number=1) + + assert executor._manager is None + + def test_end_without_start_is_noop(self): + """``end()`` on an executor that was never started must not raise.""" + executor = KubernetesExecutor() + + # Must not raise even though no Manager/queues were ever created. + executor.end() + + assert executor._manager is None + + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + def test_start_creates_manager_and_queues(self, mock_watcher, mock_client, mock_kube_client): + """``start()`` creates the Manager and queues; ``end()`` tears them down idempotently.""" + executor = KubernetesExecutor() + executor.job_id = 1 + try: + executor.start() + + assert executor._manager is not None + assert executor.task_queue is not None + assert executor.result_queue is not None + finally: + executor.end() + + # end() returns the executor to the unstarted state, so a second end() is a safe no-op. + assert executor._manager is None + assert executor.task_queue is None + assert executor.result_queue is None + executor.end() + @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Airflow 3.2+ prefers new configuration") def test_sentry_integration(self): assert not KubernetesExecutor.sentry_integration @@ -2239,16 +2307,30 @@ def test_executor_with_team_name(self): assert executor.team_name == "ml_team" assert executor.kube_config is not None + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires Airflow 3.2+") - def test_multiple_team_executors_isolation(self, monkeypatch): + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client") + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + def test_multiple_team_executors_isolation( + self, mock_watcher, mock_client, mock_kube_client, monkeypatch + ): """Test that multiple team executors can coexist with isolated resources.""" monkeypatch.setenv("AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE", "4") monkeypatch.setenv("AIRFLOW__TEAM_B___KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE", "8") team_a_executor = KubernetesExecutor(team_name="team_a") team_b_executor = KubernetesExecutor(team_name="team_b") + team_a_executor.job_id = 1 + team_b_executor.job_id = 2 try: + # Queues are created lazily in start(), so each team executor gets its own. + team_a_executor.start() + team_b_executor.start() + assert team_a_executor.task_queue is not team_b_executor.task_queue assert team_a_executor.result_queue is not team_b_executor.result_queue assert team_a_executor.running is not team_b_executor.running