From 6a61bc0d1d65b47b6dc7cd0430255b1f2dc35858 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 29 Apr 2026 13:32:05 +0530 Subject: [PATCH 1/2] Add tests for ti:self JWT scope enforcement on Execution API The ti:self scope check at security.py rejects requests where the JWT subject does not match the task_instance_id in the path. This is the cross-task isolation guarantee for any router that opts into ti:self (e.g. /task-instances, /hitl, and the upcoming task state endpoints from AIP-103). Until now the check was uncovered. Adds three tests mirroring the existing TestTokenTypeScopeEnforcement pattern: matching subject is accepted, mismatched subject returns 403, and routes without ti:self ignore subject mismatches. --- .../execution_api/test_security.py | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_security.py b/airflow-core/tests/unit/api_fastapi/execution_api/test_security.py index 985dfd325994d..62a9536bf3568 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/test_security.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_security.py @@ -158,6 +158,66 @@ def test_execution_token_accepted_on_both_routes(self, token_type_app): assert run.status_code == 200 +class TestTiSelfScopeEnforcement: + """End-to-end: routes with the ``ti:self`` Security scope reject mismatched JWT subjects.""" + + PATH_TI_ID = "00000000-0000-0000-0000-000000000001" + OTHER_TI_ID = "00000000-0000-0000-0000-000000000002" + + @pytest.fixture + def app(self): + """One router enforces ti:self, another doesn't — to confirm enforcement is opt-in.""" + app = FastAPI() + + authenticated_router = APIRouter(dependencies=[Security(require_auth)]) + ti_self_router = APIRouter(dependencies=[Security(require_auth, scopes=["ti:self"])]) + + @ti_self_router.get("/{task_instance_id}/state") + def state_endpoint(task_instance_id: str): + return {"ok": True} + + @authenticated_router.get("/no-scope/{task_instance_id}") + def no_scope_endpoint(task_instance_id: str): + return {"ok": True} + + authenticated_router.include_router(ti_self_router, prefix="/ti") + app.include_router(authenticated_router) + return app + + def _override_jwt(self, app, token_ti_id: str): + async def mock_jwt(request: Request): + return TIToken(id=UUID(token_ti_id), claims=TIClaims(scope="execution")) + + app.dependency_overrides[_jwt_bearer] = mock_jwt + + def test_matching_subject_is_accepted(self, app): + self._override_jwt(app, self.PATH_TI_ID) + client = TestClient(app) + + resp = client.get(f"/ti/{self.PATH_TI_ID}/state", headers={"Authorization": "Bearer fake"}) + + assert resp.status_code == 200 + + def test_mismatched_subject_is_rejected(self, app): + """A task cannot read or write another task's resources — the cross-TI isolation guarantee.""" + self._override_jwt(app, self.OTHER_TI_ID) + client = TestClient(app) + + resp = client.get(f"/ti/{self.PATH_TI_ID}/state", headers={"Authorization": "Bearer fake"}) + + assert resp.status_code == 403 + assert "does not match" in resp.json()["detail"] + + def test_no_enforcement_without_ti_self_scope(self, app): + """Routes without ``ti:self`` don't compare the JWT subject to the path.""" + self._override_jwt(app, self.OTHER_TI_ID) + client = TestClient(app) + + resp = client.get(f"/no-scope/{self.PATH_TI_ID}", headers={"Authorization": "Bearer fake"}) + + assert resp.status_code == 200 + + class TestGetTeamNameDep: """Tests for get_team_name_dep avoiding unnecessary async sessions.""" From da43e3d83df558f7bed9bc00664c29962fa5b409 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 29 Apr 2026 13:50:30 +0530 Subject: [PATCH 2/2] update --- .../execution_api/test_security.py | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_security.py b/airflow-core/tests/unit/api_fastapi/execution_api/test_security.py index 62a9536bf3568..22b7f58a95993 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/test_security.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_security.py @@ -159,7 +159,7 @@ def test_execution_token_accepted_on_both_routes(self, token_type_app): class TestTiSelfScopeEnforcement: - """End-to-end: routes with the ``ti:self`` Security scope reject mismatched JWT subjects.""" + """Routes with the ``ti:self`` scope reject mismatched JWT subjects.""" PATH_TI_ID = "00000000-0000-0000-0000-000000000001" OTHER_TI_ID = "00000000-0000-0000-0000-000000000002" @@ -184,9 +184,9 @@ def no_scope_endpoint(task_instance_id: str): app.include_router(authenticated_router) return app - def _override_jwt(self, app, token_ti_id: str): + def _override_jwt(self, app: FastAPI, token_ti_id: UUID): async def mock_jwt(request: Request): - return TIToken(id=UUID(token_ti_id), claims=TIClaims(scope="execution")) + return TIToken(id=token_ti_id, claims=TIClaims(scope="execution")) app.dependency_overrides[_jwt_bearer] = mock_jwt @@ -194,29 +194,26 @@ def test_matching_subject_is_accepted(self, app): self._override_jwt(app, self.PATH_TI_ID) client = TestClient(app) - resp = client.get(f"/ti/{self.PATH_TI_ID}/state", headers={"Authorization": "Bearer fake"}) + resp = client.get( + f"/ti/{self.PATH_TI_ID}/state", + headers={"Authorization": "Bearer fake"}, + ) assert resp.status_code == 200 def test_mismatched_subject_is_rejected(self, app): - """A task cannot read or write another task's resources — the cross-TI isolation guarantee.""" + """A task cannot read or write another task's resources.""" self._override_jwt(app, self.OTHER_TI_ID) client = TestClient(app) - resp = client.get(f"/ti/{self.PATH_TI_ID}/state", headers={"Authorization": "Bearer fake"}) + resp = client.get( + f"/ti/{self.PATH_TI_ID}/state", + headers={"Authorization": "Bearer fake"}, + ) assert resp.status_code == 403 assert "does not match" in resp.json()["detail"] - def test_no_enforcement_without_ti_self_scope(self, app): - """Routes without ``ti:self`` don't compare the JWT subject to the path.""" - self._override_jwt(app, self.OTHER_TI_ID) - client = TestClient(app) - - resp = client.get(f"/no-scope/{self.PATH_TI_ID}", headers={"Authorization": "Bearer fake"}) - - assert resp.status_code == 200 - class TestGetTeamNameDep: """Tests for get_team_name_dep avoiding unnecessary async sessions."""