From 4be15d5474203178eece6fbc8fabf574136d2479 Mon Sep 17 00:00:00 2001 From: AndreiLeib Date: Wed, 10 Jun 2026 20:12:14 -0400 Subject: [PATCH 1/2] do not cache None connection values Signed-off-by: AndreiLeib --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 3ab7eb1f8d654..e6214f4e4a2eb 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1180,10 +1180,10 @@ def _fetch_remote_logging_conn(conn_id: str, client: Client) -> Connection | Non from airflow.sdk.definitions.connection import Connection result: Connection | None = Connection(**conn_result.model_dump(exclude={"type"}, by_alias=True)) + _REMOTE_LOGGING_CONN_CACHE[conn_id] = result else: result = None - _REMOTE_LOGGING_CONN_CACHE[conn_id] = result return result From 510be11926628d1d2b29eef57243b8d73ecd43f9 Mon Sep 17 00:00:00 2001 From: AndreiLeib Date: Sun, 14 Jun 2026 13:23:25 -0400 Subject: [PATCH 2/2] unit test: conn cache does not retain failed lookups as None Signed-off-by: AndreiLeib --- .../execution_time/test_supervisor.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 41a1616160604..e9ff3a534466e 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -61,6 +61,7 @@ AssetEventResponse, AssetProfile, AssetResponse, + ConnectionResponse, DagRun, DagRunState, DagRunType, @@ -3788,6 +3789,37 @@ def noop_request(request: httpx.Request) -> httpx.Response: assert all(ref() is None for ref in clients), "Client instances should be garbage collected" +def test_fetch_remote_logging_conn_does_not_cache_none_result(mocker): + """Test that connection caching doesn't cache failed lookups as None.""" + conn_id = "test_conn" + client = mocker.Mock() + mocker.patch.object(supervisor, "ensure_secrets_backend_loaded", return_value=[]) + mocker.patch.dict(supervisor._REMOTE_LOGGING_CONN_CACHE, {}, clear=True) + client.connections.get.side_effect = [ + ErrorResponse(error=ErrorType.PERMISSION_DENIED), + ConnectionResponse( + conn_id=conn_id, + conn_type="example", + host=None, + schema_=None, + login=None, + password=None, + port=None, + extra=None, + ), + ] + + assert supervisor._fetch_remote_logging_conn(conn_id, client) is None + assert conn_id not in supervisor._REMOTE_LOGGING_CONN_CACHE + + second_call_result = supervisor._fetch_remote_logging_conn(conn_id, client) + assert second_call_result is not None + assert second_call_result.conn_id == conn_id + assert supervisor._REMOTE_LOGGING_CONN_CACHE[conn_id] is not None + # The first call resulted in None and was not cached, so the second fetch calls the API again. + assert client.connections.get.call_count == 2 + + def test_process_log_messages_from_subprocess(monkeypatch, caplog): from airflow.sdk._shared.logging.structlog import PER_LOGGER_LEVELS