diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py index 8485bd66a467c..b4e3d8a514b73 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py @@ -22,7 +22,7 @@ from typing import Annotated from fastapi import Depends, HTTPException, status -from sqlalchemy import and_, select +from sqlalchemy import and_, or_, select from airflow.api_fastapi.app import get_auth_manager from airflow.api_fastapi.auth.managers.models.batch_apis import IsAuthorizedDagRequest @@ -84,6 +84,11 @@ def get_import_error( file_dag_ids = set( session.scalars(select(DagModel.dag_id).where(DagModel.fileloc == error.filename)).all() ) + + # No DAGs in the file (failed to parse), nothing to check permissions against + if not file_dag_ids: + return error + # Can the user read any DAGs in the file? if not readable_dag_ids.intersection(file_dag_ids): raise HTTPException( @@ -129,7 +134,11 @@ def get_import_errors( """Get all import errors.""" auth_manager = get_auth_manager() readable_dag_ids = auth_manager.get_authorized_dag_ids(method="GET", user=user) - # Build a cte that fetches dag_ids for each file location + + # Subquery for files that have any DAGs + files_with_any_dags = select(DagModel.relative_fileloc).distinct().subquery() + + # CTE for DAGs the user can read visible_files_cte = ( select(DagModel.relative_fileloc, DagModel.dag_id, DagModel.bundle_name) .where(DagModel.dag_id.in_(readable_dag_ids)) @@ -140,13 +149,23 @@ def get_import_errors( # Each returned row will be a tuple: (ParseImportError, dag_id) import_errors_stmt = ( select(ParseImportError, visible_files_cte.c.dag_id) - .join( + .outerjoin( + files_with_any_dags, + ParseImportError.filename == files_with_any_dags.c.relative_fileloc, + ) + .outerjoin( visible_files_cte, and_( ParseImportError.filename == visible_files_cte.c.relative_fileloc, ParseImportError.bundle_name == visible_files_cte.c.bundle_name, ), ) + .where( + or_( + files_with_any_dags.c.relative_fileloc.is_(None), + visible_files_cte.c.dag_id.isnot(None), + ) + ) .order_by(ParseImportError.id) ) @@ -164,14 +183,20 @@ def get_import_errors( ) import_errors = [] - for import_error, file_dag_ids in import_errors_result: + for import_error, file_dag_ids_iter in import_errors_result: + dag_ids = [dag_id for _, dag_id in file_dag_ids_iter if dag_id is not None] + + if not dag_ids: + import_errors.append(import_error) + continue + # Check if user has read access to all the DAGs defined in the file requests: Sequence[IsAuthorizedDagRequest] = [ { "method": "GET", "details": DagDetails(id=dag_id), } - for dag_id in file_dag_ids + for dag_id in dag_ids ] if not auth_manager.batch_is_authorized_dag(requests, user=user): session.expunge(import_error) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py index 7e72e1401da57..4d8e9aae1d694 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py @@ -235,6 +235,7 @@ def test_should_raises_403_unauthorized(self, unauthorized_test_client, import_e response = unauthorized_test_client.get(f"/importErrors/{import_error_id}") assert response.status_code == 403 + @pytest.mark.usefixtures("not_permitted_dag_model") @mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager") def test_should_raises_403_unauthorized__user_can_not_read_any_dags_in_file( self, mock_get_auth_manager, test_client, import_errors @@ -272,6 +273,23 @@ def test_get_import_error__user_dont_have_read_permission_to_read_all_dags_in_fi "bundle_name": BUNDLE_NAME, } + @mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager") + def test_get_import_error__no_dag_in_dagmodel(self, mock_get_auth_manager, test_client, import_errors): + """Test import error is returned when no DAG exists in DagModel.""" + import_error_id = import_errors[0].id + set_mock_auth_manager__get_authorized_dag_ids(mock_get_auth_manager, set()) + + response = test_client.get(f"/importErrors/{import_error_id}") + + assert response.status_code == 200 + assert response.json() == { + "import_error_id": import_error_id, + "timestamp": from_datetime_to_zulu_without_ms(TIMESTAMP1), + "filename": FILENAME1, + "stack_trace": STACKTRACE1, + "bundle_name": BUNDLE_NAME, + } + class TestGetImportErrors: @pytest.mark.parametrize( @@ -395,7 +413,6 @@ def test_should_raises_403_unauthorized(self, unauthorized_test_client): ), ], ) - @pytest.mark.usefixtures("permitted_dag_model") @mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager") def test_user_can_not_read_all_dags_in_file( self, @@ -403,18 +420,17 @@ def test_user_can_not_read_all_dags_in_file( test_client, batch_is_authorized_dag_return_value, expected_stack_trace, - permitted_dag_model, + permitted_dag_model_all, import_errors, ): mock_get_authorized_dag_ids = set_mock_auth_manager__get_authorized_dag_ids( - mock_get_auth_manager, {permitted_dag_model.dag_id} + mock_get_auth_manager, {"dag_id1"} ) set_mock_auth_manager__batch_is_authorized_dag( mock_get_auth_manager, batch_is_authorized_dag_return_value ) # Act - with assert_queries_count(3): - response = test_client.get("/importErrors") + response = test_client.get("/importErrors") # Assert mock_get_authorized_dag_ids.assert_called_once_with(method="GET", user=mock.ANY) assert response.status_code == 200 @@ -432,14 +448,13 @@ def test_user_can_not_read_all_dags_in_file( ], } - @pytest.mark.usefixtures("permitted_dag_model") @mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager") def test_bundle_name_join_condition_for_import_errors( - self, mock_get_auth_manager, test_client, permitted_dag_model, import_errors, session + self, mock_get_auth_manager, test_client, permitted_dag_model_all, import_errors, session ): """Test that the bundle_name join condition works correctly.""" mock_get_authorized_dag_ids = set_mock_auth_manager__get_authorized_dag_ids( - mock_get_auth_manager, {permitted_dag_model.dag_id} + mock_get_auth_manager, {"dag_id1"} ) set_mock_auth_manager__batch_is_authorized_dag(mock_get_auth_manager, True) @@ -456,10 +471,11 @@ def test_bundle_name_join_condition_for_import_errors( assert response_json["import_errors"][0]["filename"] == FILENAME1 # Now test that removing the bundle_name from the DagModel causes the import error to not be returned - permitted_dag_model.bundle_name = "another_bundle_name" + dag_model1 = session.get(DagModel, "dag_id1") session.add(DagBundleModel(name="another_bundle_name")) session.flush() - session.merge(permitted_dag_model) + dag_model1.bundle_name = "another_bundle_name" + session.merge(dag_model1) session.commit() response2 = test_client.get("/importErrors") @@ -469,3 +485,18 @@ def test_bundle_name_join_condition_for_import_errors( response_json2 = response2.json() assert response_json2["total_entries"] == 0 assert response_json2["import_errors"] == [] + + @mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager") + def test_get_import_errors__no_dag_in_dagmodel(self, mock_get_auth_manager, test_client, import_errors): + """Test import errors are returned when no DAG exists in DagModel.""" + set_mock_auth_manager__get_authorized_dag_ids(mock_get_auth_manager, set()) + + response = test_client.get("/importErrors") + + assert response.status_code == 200 + response_json = response.json() + assert response_json["total_entries"] == 3 + filenames = [error["filename"] for error in response_json["import_errors"]] + assert FILENAME1 in filenames + assert FILENAME2 in filenames + assert FILENAME3 in filenames