Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions airflow-core/src/airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,21 @@ def _get_executor_get_task_log(
if executor is not None:
return executor.get_task_log

# Load executor class without instantiating to check if it has a static log method
try:
if executor_name == self.DEFAULT_EXECUTOR_KEY:
executor_cls, _ = ExecutorLoader.import_default_executor_cls()
else:
executor_name_obj = ExecutorLoader.lookup_executor_name_by_str(executor_name)
executor_cls, _ = ExecutorLoader.import_executor_cls(executor_name_obj)

# Check if executor has a static log method to avoid instantiation leak
if hasattr(executor_cls, "_get_task_log_static"):
return executor_cls._get_task_log_static
except Exception:
# Fall back to instantiation if static method lookup fails
pass

if executor_name == self.DEFAULT_EXECUTOR_KEY:
self.executor_instances[executor_name] = ExecutorLoader.get_default_executor()
else:
Expand Down
25 changes: 18 additions & 7 deletions airflow-core/tests/unit/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,14 @@ def test_file_task_handler_with_multiple_executors(
else:
path_to_executor_class = executors_mapping.get(executor_name)

with patch(f"{path_to_executor_class}.get_task_log", return_value=([], [])) as mock_get_task_log:
from importlib import import_module

module_path, class_name = path_to_executor_class.rsplit(".", 1)
mod = import_module(module_path)
cls = getattr(mod, class_name)
method_to_patch = "_get_task_log_static" if hasattr(cls, "_get_task_log_static") else "get_task_log"

with patch(f"{path_to_executor_class}.{method_to_patch}", return_value=([], [])) as mock_get_task_log:
mock_get_task_log.return_value = ([], [])
ti = create_task_instance(
dag_id="dag_for_testing_multiple_executors",
Expand Down Expand Up @@ -328,13 +335,17 @@ def test_file_task_handler_with_multiple_executors(
os.remove(log_filename)
mock_get_task_log.assert_called_once()

if executor_name is None:
mock_get_default_executor.assert_called_once()
# will be called in `ExecutorLoader.get_default_executor` method
mock_load_executor.assert_called_once_with(default_executor_name)
else:
if method_to_patch == "_get_task_log_static":
mock_get_default_executor.assert_not_called()
mock_load_executor.assert_called_once_with(executor_name)
mock_load_executor.assert_not_called()
else:
if executor_name is None:
mock_get_default_executor.assert_called_once()
# will be called in `ExecutorLoader.get_default_executor` method
mock_load_executor.assert_called_once_with(default_executor_name)
else:
mock_get_default_executor.assert_not_called()
mock_load_executor.assert_called_once_with(executor_name)

def test_file_task_handler_running(self, dag_maker):
def task_callable(ti):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,28 @@ def _get_pod_namespace(self, ti: TaskInstance):
namespace = pod_override.metadata.namespace
return namespace or self.conf.get("kubernetes_executor", "namespace")

@classmethod
def _get_pod_namespace_for_log(cls, ti: TaskInstance) -> str:
"""Get pod namespace for log reading without requiring executor instance."""
pod_override = (ti.executor_config or {}).get("pod_override")
namespace = None
with suppress(Exception):
if pod_override is not None:
namespace = pod_override.metadata.namespace
return namespace or conf.get("kubernetes_executor", "namespace")

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
"""Fetch task log from running Kubernetes pod (instance method for backward compatibility)."""
return self.__class__._get_task_log_static(ti, try_number)

@classmethod
def _get_task_log_static(cls, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
"""
Fetch task log from running Kubernetes pod without requiring executor instantiation.

This method does not create a multiprocessing.Manager, avoiding the memory leak
that occurs when the API server reads logs from running tasks.
"""
messages = []
log = []
try:
Expand All @@ -527,7 +548,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li
run_id=ti.run_id,
airflow_worker=ti.queued_by_job_id,
)
namespace = self._get_pod_namespace(ti)
namespace = cls._get_pod_namespace_for_log(ti)
pod_list = client.list_namespaced_pod(
namespace=namespace,
label_selector=selector,
Expand All @@ -541,7 +562,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li
namespace=namespace,
container="base",
follow=False,
tail_lines=self.RUNNING_POD_LOG_LINES,
tail_lines=cls.RUNNING_POD_LOG_LINES,
_preload_content=False,
)
for line in res:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,30 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat
"Reading from k8s pod logs failed: error_fetching_pod_log",
]

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_get_task_log_without_executor_instantiation(
self, mock_get_kube_client, create_task_instance_of_operator
):
"""Verify get_task_log works via classmethod without creating multiprocessing.Manager."""
mock_kube_client = mock_get_kube_client.return_value
mock_kube_client.read_namespaced_pod_log.return_value = [b"log_line_1", b"log_line_2"]
mock_pod = mock.Mock()
mock_pod.metadata.name = "test-pod"
mock_kube_client.list_namespaced_pod.return_value.items = [mock_pod]

ti = create_task_instance_of_operator(EmptyOperator, dag_id="test_dag", task_id="test_task")

# Call the static method directly without creating an executor instance
messages, logs = KubernetesExecutor._get_task_log_static(ti=ti, try_number=1)

assert messages == [
"Attempting to fetch logs from pod through kube API",
"Found logs through kube API",
]
assert logs[0] == "log_line_1\nlog_line_2"
mock_kube_client.read_namespaced_pod_log.assert_called_once()

@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Airflow 3.2+ prefers new configuration")
def test_sentry_integration(self):
assert not KubernetesExecutor.sentry_integration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ def teardown_method(self):
self.clean_up()

@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.get_task_log"
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._get_task_log_static"
)
@pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS])
@pytest.mark.usefixtures("clean_executor_loader")
def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instance, state):
"""Test for k8s executor, the log is read from get_task_log method"""
"""Test for k8s executor, the log is read from _get_task_log_static method"""
mock_k8s_get_task_log.return_value = ([], [])
executor_name = "KubernetesExecutor"
ti = create_task_instance(
Expand Down
Loading