From da212558c5661aeec899ea63fe6ff9516d64caf4 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 28 Oct 2025 21:38:04 +0530 Subject: [PATCH 1/2] Revert "Fix memory leak in remote logging connection cache (#56695)" This reverts commit 416c73e864b5c9a52b50053baa7876bcb5bcfe38. --- .../airflow/sdk/execution_time/supervisor.py | 75 +++++++------------ .../execution_time/test_supervisor.py | 41 ---------- 2 files changed, 29 insertions(+), 87 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 9675887cb8a3e..53d63f4950055 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -33,6 +33,7 @@ from collections.abc import Callable, Generator from contextlib import contextmanager, suppress from datetime import datetime, timezone +from functools import lru_cache from http import HTTPStatus from socket import socket, socketpair from typing import ( @@ -826,10 +827,8 @@ def _check_subprocess_exit( return self._exit_code -_REMOTE_LOGGING_CONN_CACHE: dict[str, Connection | None] = {} - - -def _fetch_remote_logging_conn(conn_id: str, client: Client) -> Connection | None: +@lru_cache +def _get_remote_logging_conn(conn_id: str, client: Client) -> Connection | None: """ Fetch and cache connection for remote logging. @@ -838,22 +837,18 @@ def _fetch_remote_logging_conn(conn_id: str, client: Client) -> Connection | Non client: API client for making requests Returns: - Connection object or None if not found. + Connection object or None if not found """ # Since we need to use the API Client directly, we can't use Connection.get as that would try to use # SUPERVISOR_COMMS # TODO: Store in the SecretsCache if its enabled - see #48858 - if conn_id in _REMOTE_LOGGING_CONN_CACHE: - return _REMOTE_LOGGING_CONN_CACHE[conn_id] - backends = ensure_secrets_backend_loaded() for secrets_backend in backends: try: conn = secrets_backend.get_connection(conn_id=conn_id) if conn: - _REMOTE_LOGGING_CONN_CACHE[conn_id] = conn return conn except Exception: log.exception( @@ -867,12 +862,8 @@ def _fetch_remote_logging_conn(conn_id: str, client: Client) -> Connection | Non conn_result = ConnectionResult.from_conn_response(conn) from airflow.sdk.definitions.connection import Connection - result: Connection | None = Connection(**conn_result.model_dump(exclude={"type"}, by_alias=True)) - else: - result = None - - _REMOTE_LOGGING_CONN_CACHE[conn_id] = result - return result + return Connection(**conn_result.model_dump(exclude={"type"}, by_alias=True)) + return None @contextlib.contextmanager @@ -887,8 +878,7 @@ def _remote_logging_conn(client: Client): This is needed as the BaseHook.get_connection looks for SUPERVISOR_COMMS, but we are still in the supervisor process when this is needed, so that doesn't exist yet. - The connection details are fetched eagerly on every invocation to avoid retaining - per-task API client instances in global caches. + This function uses @lru_cache for connection caching to avoid repeated API calls. """ from airflow.sdk.log import load_remote_conn_id, load_remote_log_handler @@ -897,8 +887,8 @@ def _remote_logging_conn(client: Client): yield return - # Fetch connection details on-demand without caching the entire API client instance - conn = _fetch_remote_logging_conn(conn_id, client) + # Use cached connection fetcher + conn = _get_remote_logging_conn(conn_id, client) if conn: key = f"AIRFLOW_CONN_{conn_id.upper()}" @@ -1922,11 +1912,9 @@ def supervise( if not dag_rel_path: raise ValueError("dag_path is required") - close_client = False if not client: limits = httpx.Limits(max_keepalive_connections=1, max_connections=10) client = Client(base_url=server or "", limits=limits, dry_run=dry_run, token=token) - close_client = True start = time.monotonic() @@ -1945,29 +1933,24 @@ def supervise( reset_secrets_masker() - try: - process = ActivitySubprocess.start( - dag_rel_path=dag_rel_path, - what=ti, - client=client, - logger=logger, - bundle_info=bundle_info, - subprocess_logs_to_stdout=subprocess_logs_to_stdout, - ) + process = ActivitySubprocess.start( + dag_rel_path=dag_rel_path, + what=ti, + client=client, + logger=logger, + bundle_info=bundle_info, + subprocess_logs_to_stdout=subprocess_logs_to_stdout, + ) - exit_code = process.wait() - end = time.monotonic() - log.info( - "Task finished", - task_instance_id=str(ti.id), - exit_code=exit_code, - duration=end - start, - final_state=process.final_state, - ) - return exit_code - finally: - if log_path and log_file_descriptor: - log_file_descriptor.close() - if close_client and client: - with suppress(Exception): - client.close() + exit_code = process.wait() + end = time.monotonic() + log.info( + "Task finished", + task_instance_id=str(ti.id), + exit_code=exit_code, + duration=end - start, + final_state=process.final_state, + ) + if log_path and log_file_descriptor: + log_file_descriptor.close() + return exit_code diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 7cc057ccdd454..837968fe09759 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2630,47 +2630,6 @@ def mock_upload_to_remote(process_log, ti): assert connection_available["conn_uri"] is not None, "Connection URI was None during upload" -def test_remote_logging_conn_caches_connection_not_client(monkeypatch): - """Test that connection caching doesn't retain API client references.""" - import gc - import weakref - - from airflow.sdk import log as sdk_log - from airflow.sdk.execution_time import supervisor - - class ExampleBackend: - def __init__(self): - self.calls = 0 - - def get_connection(self, conn_id: str): - self.calls += 1 - from airflow.sdk.definitions.connection import Connection - - return Connection(conn_id=conn_id, conn_type="example") - - backend = ExampleBackend() - monkeypatch.setattr(supervisor, "ensure_secrets_backend_loaded", lambda: [backend]) - monkeypatch.setattr(sdk_log, "load_remote_log_handler", lambda: object()) - monkeypatch.setattr(sdk_log, "load_remote_conn_id", lambda: "test_conn") - monkeypatch.delenv("AIRFLOW_CONN_TEST_CONN", raising=False) - - def noop_request(request: httpx.Request) -> httpx.Response: - return httpx.Response(200) - - clients = [] - for _ in range(3): - client = make_client(transport=httpx.MockTransport(noop_request)) - clients.append(weakref.ref(client)) - with _remote_logging_conn(client): - pass - client.close() - del client - - gc.collect() - assert backend.calls == 1, "Connection should be cached, not fetched multiple times" - assert all(ref() is None for ref in clients), "Client instances should be garbage collected" - - def test_process_log_messages_from_subprocess(monkeypatch, caplog): from airflow.sdk._shared.logging.structlog import PER_LOGGER_LEVELS From 73525328e584fa1843f87e717b6fe7924d9ee383 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Sat, 29 Nov 2025 11:40:02 +0530 Subject: [PATCH 2/2] enable e2e ui test to install pnpm if not installed --- .../commands/testing_commands.py | 4 ++- .../src/airflow_breeze/utils/run_utils.py | 36 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py b/dev/breeze/src/airflow_breeze/commands/testing_commands.py index ec672cd2a02d4..52d178d71bbde 100644 --- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py @@ -1471,11 +1471,13 @@ def ui_e2e_tests( from pathlib import Path from airflow_breeze.utils.console import get_console - from airflow_breeze.utils.run_utils import run_command + from airflow_breeze.utils.run_utils import check_pnpm_installed, run_command from airflow_breeze.utils.shared_options import get_dry_run, get_verbose perform_environment_checks() + check_pnpm_installed() + airflow_root = Path(__file__).resolve().parents[5] ui_dir = airflow_root / "airflow-core" / "src" / "airflow" / "ui" diff --git a/dev/breeze/src/airflow_breeze/utils/run_utils.py b/dev/breeze/src/airflow_breeze/utils/run_utils.py index 8c34f1e8a16f5..7a5374f3466db 100644 --- a/dev/breeze/src/airflow_breeze/utils/run_utils.py +++ b/dev/breeze/src/airflow_breeze/utils/run_utils.py @@ -260,6 +260,42 @@ def assert_prek_installed(): sys.exit(1) +def check_pnpm_installed(): + """ + Check if pnpm is installed and install it if npm is available. + """ + if shutil.which("pnpm"): + return + + get_console().print("[warning]pnpm is not installed. Installing pnpm...[/]") + + # Check if npm is available (required to install pnpm) + if not shutil.which("npm"): + get_console().print("[error]npm is not installed. Please install Node.js and npm first.[/]") + get_console().print("[warning]Visit: https://nodejs.org/[/]") + sys.exit(1) + + try: + get_console().print("[info]Installing pnpm using npm...[/]") + result = run_command( + ["npm", "install", "-g", "pnpm"], + no_output_dump_on_exception=True, + capture_output=True, + text=True, + check=False, + ) + if result.returncode == 0: + get_console().print("[success]pnpm has been installed successfully![/]") + else: + get_console().print(f"[error]Failed to install pnpm: {result.stderr}[/]") + get_console().print("[warning]Please install pnpm manually: https://pnpm.io/installation[/]") + sys.exit(1) + except Exception as e: + get_console().print(f"[error]Failed to install pnpm: {e}[/]") + get_console().print("[warning]Please install pnpm manually: https://pnpm.io/installation[/]") + sys.exit(1) + + def get_filesystem_type(filepath: str): """ Determine the type of filesystem used - we might want to use different parameters if tmpfs is used.