From d89c8beba63577225f6af4c37cdaa0d7ecbba0d6 Mon Sep 17 00:00:00 2001 From: yulit0738 Date: Thu, 5 Feb 2026 12:23:10 +0900 Subject: [PATCH 1/2] fix(providers/fab): add session cleanup middleware to FAB FastAPI app Add HTTP middleware to get_fastapi_app() that calls Session.remove() after each request. This prevents PendingRollbackError that can occur when database connections time out while transactions are in 'idle in transaction' state. Unlike Flask's teardown_appcontext, FastAPI doesn't automatically clean up SQLAlchemy scoped sessions, which can leave transactions open and eventually cause session invalidation errors. --- providers/fab/newsfragments/61480.bugfix.rst | 1 + .../fab/auth_manager/fab_auth_manager.py | 20 +++++ .../fab/auth_manager/test_fab_auth_manager.py | 82 +++++++++++++++++++ 3 files changed, 103 insertions(+) create mode 100644 providers/fab/newsfragments/61480.bugfix.rst diff --git a/providers/fab/newsfragments/61480.bugfix.rst b/providers/fab/newsfragments/61480.bugfix.rst new file mode 100644 index 0000000000000..a749e70c7a8c2 --- /dev/null +++ b/providers/fab/newsfragments/61480.bugfix.rst @@ -0,0 +1 @@ +Fix ``PendingRollbackError`` on FAB admin pages by adding session cleanup middleware to the FAB FastAPI app. diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py index eb0fb75275fd7..8219b262d7264 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -222,6 +222,26 @@ def get_fastapi_app(self) -> FastAPI | None: app.include_router(roles_router) app.include_router(users_router) + # Session cleanup middleware to prevent PendingRollbackError. + # FAB's Flask views (e.g., /users/list/, /roles/list/) are mounted below via + # WSGIMiddleware. These views use settings.Session (SQLAlchemy scoped_session), + # but unlike a native Flask app where teardown_appcontext calls Session.remove(), + # the WSGI wrapper does not trigger Flask's teardown hooks. + # Without explicit cleanup, sessions remain in "idle in transaction" state. + # When the database connection times out (e.g., PostgreSQL's + # idle_in_transaction_session_timeout), subsequent requests reusing the + # invalidated session raise PendingRollbackError. + @app.middleware("http") + async def cleanup_session_middleware(request, call_next): + try: + response = await call_next(request) + return response + finally: + from airflow import settings + + if settings.Session: + settings.Session.remove() + app.mount("/", WSGIMiddleware(flask_app)) return app diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py index 248c57e73c5ca..35856f7fb6ec4 100644 --- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py +++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py @@ -957,3 +957,85 @@ def test_resetdb( mock_init.assert_not_called() else: mock_init.assert_called_once() + + +class TestFabAuthManagerSessionCleanup: + """Test session cleanup middleware in FAB auth manager FastAPI app. + + Background: + FAB auth manager's FastAPI app has the following route structure: + - /token, /logout: FastAPI routes (login_router) + - /users/*, /roles/*: FastAPI API routes + - /*: WSGIMiddleware -> Flask App (FAB views like /users/list/, /roles/list/) + + Problem: + FAB's Flask views (e.g., /users/list/, /roles/list/) use settings.Session + (SQLAlchemy scoped_session). In a normal Flask app, teardown_appcontext + automatically calls Session.remove() after each request. However, when Flask + is mounted via WSGIMiddleware in FastAPI, teardown_appcontext does NOT trigger. + + This leaves database sessions in "idle in transaction" state. When the database + connection times out (e.g., PostgreSQL's idle_in_transaction_session_timeout), + subsequent requests reusing the invalidated session raise PendingRollbackError. + + Solution: + Add a FastAPI middleware that calls Session.remove() in the finally block, + ensuring session cleanup for ALL requests including those forwarded to Flask via WSGI. + """ + + @mock.patch("airflow.providers.fab.auth_manager.fab_auth_manager.create_app") + def test_session_cleanup_middleware_on_wsgi_route(self, mock_create_app): + """Test Session.remove() is called after requests to WSGI-mounted Flask routes. + + This is the critical scenario: requests to Flask AppBuilder views like + /users/list/ and /roles/list/ go through WSGIMiddleware. Without the + cleanup middleware, these requests leave sessions in "idle in transaction" + state, eventually causing PendingRollbackError. + """ + from unittest.mock import patch + + from fastapi.testclient import TestClient + + # Setup mock Flask app (simulates FAB's Flask app) + mock_flask_app = MagicMock() + mock_create_app.return_value = mock_flask_app + + auth_manager = FabAuthManager() + fastapi_app = auth_manager.get_fastapi_app() + + client = TestClient(fastapi_app, raise_server_exceptions=False) + + with patch("airflow.settings.Session") as mock_session: + # Request to a path not handled by FastAPI routers goes to WSGIMiddleware -> Flask + # This simulates accessing /users/list/ or /roles/list/ which caused the original bug + client.get("/users/list/") + + # Verify Session.remove() was called by the cleanup middleware + mock_session.remove.assert_called() + + @mock.patch("airflow.providers.fab.auth_manager.fab_auth_manager.create_app") + def test_session_cleanup_middleware_on_fastapi_route(self, mock_create_app): + """Test Session.remove() is also called after FastAPI route requests. + + Even though FastAPI routes may not directly use settings.Session, + the middleware should clean up any session that might have been + used during request processing (e.g., by dependencies or nested calls). + """ + from unittest.mock import patch + + from fastapi.testclient import TestClient + + mock_flask_app = MagicMock() + mock_create_app.return_value = mock_flask_app + + auth_manager = FabAuthManager() + fastapi_app = auth_manager.get_fastapi_app() + + client = TestClient(fastapi_app, raise_server_exceptions=False) + + with patch("airflow.settings.Session") as mock_session: + # Request to a FastAPI route (login endpoint) + client.post("/token", json={"username": "test", "password": "test"}) + + # Verify Session.remove() was called + mock_session.remove.assert_called() From a8052711f6f4fa1b7d40895ad8bd8b7c1cefd6c6 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Tue, 10 Feb 2026 18:41:31 +0200 Subject: [PATCH 2/2] Delete providers/fab/newsfragments/61480.bugfix.rst --- providers/fab/newsfragments/61480.bugfix.rst | 1 - 1 file changed, 1 deletion(-) delete mode 100644 providers/fab/newsfragments/61480.bugfix.rst diff --git a/providers/fab/newsfragments/61480.bugfix.rst b/providers/fab/newsfragments/61480.bugfix.rst deleted file mode 100644 index a749e70c7a8c2..0000000000000 --- a/providers/fab/newsfragments/61480.bugfix.rst +++ /dev/null @@ -1 +0,0 @@ -Fix ``PendingRollbackError`` on FAB admin pages by adding session cleanup middleware to the FAB FastAPI app.