From a3b4aab802eb030ef44f99d200b42988a238ee69 Mon Sep 17 00:00:00 2001 From: Subham Sangwan Date: Thu, 18 Jun 2026 11:24:14 +0530 Subject: [PATCH] Fix API server memory leak from KubernetesExecutor Manager processes --- .../airflow/utils/log/file_task_handler.py | 15 +++++++++++ .../tests/unit/utils/test_log_handlers.py | 25 +++++++++++++------ .../executors/kubernetes_executor.py | 25 +++++++++++++++++-- .../executors/test_kubernetes_executor.py | 24 ++++++++++++++++++ .../log_handlers/test_log_handlers.py | 4 +-- 5 files changed, 82 insertions(+), 11 deletions(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 8a94b513b4163..558aafb4e206c 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -573,6 +573,21 @@ def _get_executor_get_task_log( if executor is not None: return executor.get_task_log + # Load executor class without instantiating to check if it has a static log method + try: + if executor_name == self.DEFAULT_EXECUTOR_KEY: + executor_cls, _ = ExecutorLoader.import_default_executor_cls() + else: + executor_name_obj = ExecutorLoader.lookup_executor_name_by_str(executor_name) + executor_cls, _ = ExecutorLoader.import_executor_cls(executor_name_obj) + + # Check if executor has a static log method to avoid instantiation leak + if hasattr(executor_cls, "_get_task_log_static"): + return executor_cls._get_task_log_static + except Exception: + # Fall back to instantiation if static method lookup fails + pass + if executor_name == self.DEFAULT_EXECUTOR_KEY: self.executor_instances[executor_name] = ExecutorLoader.get_default_executor() else: diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index 1c52f84a5f68b..a70c9eee3150e 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -291,7 +291,14 @@ def test_file_task_handler_with_multiple_executors( else: path_to_executor_class = executors_mapping.get(executor_name) - with patch(f"{path_to_executor_class}.get_task_log", return_value=([], [])) as mock_get_task_log: + from importlib import import_module + + module_path, class_name = path_to_executor_class.rsplit(".", 1) + mod = import_module(module_path) + cls = getattr(mod, class_name) + method_to_patch = "_get_task_log_static" if hasattr(cls, "_get_task_log_static") else "get_task_log" + + with patch(f"{path_to_executor_class}.{method_to_patch}", return_value=([], [])) as mock_get_task_log: mock_get_task_log.return_value = ([], []) ti = create_task_instance( dag_id="dag_for_testing_multiple_executors", @@ -328,13 +335,17 @@ def test_file_task_handler_with_multiple_executors( os.remove(log_filename) mock_get_task_log.assert_called_once() - if executor_name is None: - mock_get_default_executor.assert_called_once() - # will be called in `ExecutorLoader.get_default_executor` method - mock_load_executor.assert_called_once_with(default_executor_name) - else: + if method_to_patch == "_get_task_log_static": mock_get_default_executor.assert_not_called() - mock_load_executor.assert_called_once_with(executor_name) + mock_load_executor.assert_not_called() + else: + if executor_name is None: + mock_get_default_executor.assert_called_once() + # will be called in `ExecutorLoader.get_default_executor` method + mock_load_executor.assert_called_once_with(default_executor_name) + else: + mock_get_default_executor.assert_not_called() + mock_load_executor.assert_called_once_with(executor_name) def test_file_task_handler_running(self, dag_maker): def task_callable(ti): diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 0dcc01537cb78..daa57267edd9b 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -509,7 +509,28 @@ def _get_pod_namespace(self, ti: TaskInstance): namespace = pod_override.metadata.namespace return namespace or self.conf.get("kubernetes_executor", "namespace") + @classmethod + def _get_pod_namespace_for_log(cls, ti: TaskInstance) -> str: + """Get pod namespace for log reading without requiring executor instance.""" + pod_override = (ti.executor_config or {}).get("pod_override") + namespace = None + with suppress(Exception): + if pod_override is not None: + namespace = pod_override.metadata.namespace + return namespace or conf.get("kubernetes_executor", "namespace") + def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: + """Fetch task log from running Kubernetes pod (instance method for backward compatibility).""" + return self.__class__._get_task_log_static(ti, try_number) + + @classmethod + def _get_task_log_static(cls, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: + """ + Fetch task log from running Kubernetes pod without requiring executor instantiation. + + This method does not create a multiprocessing.Manager, avoiding the memory leak + that occurs when the API server reads logs from running tasks. + """ messages = [] log = [] try: @@ -527,7 +548,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li run_id=ti.run_id, airflow_worker=ti.queued_by_job_id, ) - namespace = self._get_pod_namespace(ti) + namespace = cls._get_pod_namespace_for_log(ti) pod_list = client.list_namespaced_pod( namespace=namespace, label_selector=selector, @@ -541,7 +562,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li namespace=namespace, container="base", follow=False, - tail_lines=self.RUNNING_POD_LOG_LINES, + tail_lines=cls.RUNNING_POD_LOG_LINES, _preload_content=False, ) for line in res: diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index bc1c2a97f55c7..95d759399a25b 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1726,6 +1726,30 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat "Reading from k8s pod logs failed: error_fetching_pod_log", ] + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_get_task_log_without_executor_instantiation( + self, mock_get_kube_client, create_task_instance_of_operator + ): + """Verify get_task_log works via classmethod without creating multiprocessing.Manager.""" + mock_kube_client = mock_get_kube_client.return_value + mock_kube_client.read_namespaced_pod_log.return_value = [b"log_line_1", b"log_line_2"] + mock_pod = mock.Mock() + mock_pod.metadata.name = "test-pod" + mock_kube_client.list_namespaced_pod.return_value.items = [mock_pod] + + ti = create_task_instance_of_operator(EmptyOperator, dag_id="test_dag", task_id="test_task") + + # Call the static method directly without creating an executor instance + messages, logs = KubernetesExecutor._get_task_log_static(ti=ti, try_number=1) + + assert messages == [ + "Attempting to fetch logs from pod through kube API", + "Found logs through kube API", + ] + assert logs[0] == "log_line_1\nlog_line_2" + mock_kube_client.read_namespaced_pod_log.assert_called_once() + @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Airflow 3.2+ prefers new configuration") def test_sentry_integration(self): assert not KubernetesExecutor.sentry_integration diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/log_handlers/test_log_handlers.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/log_handlers/test_log_handlers.py index 1a95dd4349b32..18cf4dbab4190 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/log_handlers/test_log_handlers.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/log_handlers/test_log_handlers.py @@ -74,12 +74,12 @@ def teardown_method(self): self.clean_up() @mock.patch( - "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.get_task_log" + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._get_task_log_static" ) @pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS]) @pytest.mark.usefixtures("clean_executor_loader") def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instance, state): - """Test for k8s executor, the log is read from get_task_log method""" + """Test for k8s executor, the log is read from _get_task_log_static method""" mock_k8s_get_task_log.return_value = ([], []) executor_name = "KubernetesExecutor" ti = create_task_instance(