diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py index 8a0d8bbacd122..ce2ff1ccdf647 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py @@ -25,7 +25,7 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.ui.common import BaseGraphResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.api_fastapi.core_api.security import requires_access_dag +from airflow.api_fastapi.core_api.security import ReadableDagsFilterDep, requires_access_dag from airflow.api_fastapi.core_api.services.ui.dependencies import extract_single_connected_component from airflow.models.serialized_dag import SerializedDagModel @@ -41,12 +41,20 @@ ), dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.DEPENDENCIES))], ) -def get_dependencies(session: SessionDep, node_id: str | None = None) -> BaseGraphResponse: +def get_dependencies( + session: SessionDep, + readable_dags_filter: ReadableDagsFilterDep, + node_id: str | None = None, +) -> BaseGraphResponse: """Dependencies graph.""" nodes_dict: dict[str, dict] = {} edge_tuples: set[tuple[str, str]] = set() - for dag, dependencies in sorted(SerializedDagModel.get_dag_dependencies().items()): + dag_dependencies = SerializedDagModel.get_dag_dependencies() + readable_dag_ids = readable_dags_filter.value + for dag, dependencies in sorted(dag_dependencies.items()): + if readable_dag_ids is not None and dag not in readable_dag_ids: + continue dag_node_id = f"dag:{dag}" if dag_node_id not in nodes_dict: for dep in dependencies: diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py index c18ffe92d4a07..fbc427f138215 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from unittest import mock + import pendulum import pytest from sqlalchemy import select @@ -204,7 +206,7 @@ def expected_secondary_component_response(asset2_id): class TestGetDependencies: @pytest.mark.usefixtures("make_primary_connected_component") def test_should_response_200(self, test_client, expected_primary_component_response): - with assert_queries_count(5): + with assert_queries_count(6): response = test_client.get("/dependencies") assert response.status_code == 200 @@ -240,7 +242,7 @@ def test_delete_dag_should_response_403(self, unauthorized_test_client): @pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component") def test_with_node_id_filter(self, test_client, node_id, expected_response_fixture, request): expected_response = request.getfixturevalue(expected_response_fixture) - with assert_queries_count(5): + with assert_queries_count(6): response = test_client.get("/dependencies", params={"node_id": node_id}) assert response.status_code == 200 @@ -258,7 +260,7 @@ def test_with_node_id_filter_with_asset( (asset1_id, expected_primary_component_response), (asset2_id, expected_secondary_component_response), ): - with assert_queries_count(5): + with assert_queries_count(6): response = test_client.get("/dependencies", params={"node_id": f"asset:{asset_id}"}) assert response.status_code == 200 @@ -272,3 +274,26 @@ def test_with_node_id_filter_not_found(self, test_client): assert response.json() == { "detail": "Unique connected component not found, got [] for connected components of node missing_node_id, expected only 1 connected component.", } + + @mock.patch( + "airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids", + return_value={"upstream", "downstream"}, + ) + @pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component") + def test_scheduling_dependencies_respects_readable_dags_filter(self, _, test_client): + response = test_client.get("/dependencies") + assert response.status_code == 200 + + result = response.json() + dag_node_ids = {node["id"] for node in result["nodes"] if node["type"] == "dag"} + expected_present = ["dag:upstream", "dag:downstream"] + expected_absent = [ + "dag:other_dag", + "dag:external_trigger_dag_id", + "dag:upstream_secondary", + "dag:downstream_secondary", + ] + for node_id in expected_present: + assert node_id in dag_node_ids + for node_id in expected_absent: + assert node_id not in dag_node_ids