diff --git a/airflow-core/src/airflow/api_fastapi/common/dagbag.py b/airflow-core/src/airflow/api_fastapi/common/dagbag.py index af3a05d98af3a..d87aca49a524a 100644 --- a/airflow-core/src/airflow/api_fastapi/common/dagbag.py +++ b/airflow-core/src/airflow/api_fastapi/common/dagbag.py @@ -24,6 +24,7 @@ from airflow.configuration import conf from airflow.models.dagbag import DBDagBag +from airflow.models.serialized_dag import SerializedDagModel if TYPE_CHECKING: from airflow.models.dagrun import DagRun @@ -115,4 +116,28 @@ def get_dag_for_run_or_latest_version( return dag +def resolve_run_on_latest_version( + explicit_value: bool | None, + dag_id: str, + session: Session, + fallback: bool = False, +) -> bool: + """ + Resolve run_on_latest_version using precedence: explicit > DAG-level > global config > fallback. + + :param explicit_value: Value from the API request body (or None if not specified). + :param dag_id: The DAG ID to look up. + :param session: Database session. + :param fallback: Default to use when neither DAG-level nor global config is set. + Clear/rerun endpoints use False (the historical default). + Backfill endpoint uses True (the historical default for backfills). + """ + if explicit_value is not None: + return explicit_value + serialized = SerializedDagModel.get_dag(dag_id, session=session) + if serialized and serialized.rerun_with_latest_version is not None: + return serialized.rerun_with_latest_version + return conf.getboolean("core", "rerun_with_latest_version", fallback=fallback) + + DagBagDep = Annotated[DBDagBag, Depends(dag_bag_from_app)] diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py index 4e4bc90dceb97..0dba6086e0aee 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -25,6 +25,7 @@ from sqlalchemy.orm import joinedload from airflow._shared.timezones import timezone +from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version from airflow.api_fastapi.common.db.common import ( SessionDep, paginated_select, @@ -42,7 +43,6 @@ create_openapi_http_exception_doc, ) from airflow.api_fastapi.core_api.security import GetUserDep, requires_access_backfill -from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version from airflow.api_fastapi.logging.decorators import action_logging from airflow.exceptions import DagNotFound, DagRunTypeNotAllowed from airflow.models import DagRun diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 2f1c1d3d09bc9..68120ace15380 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -38,6 +38,7 @@ get_dag_for_run, get_dag_for_run_or_latest_version, get_latest_version_of_dag, + resolve_run_on_latest_version, ) from airflow.api_fastapi.common.db.common import SessionDep, apply_filters_to_select, paginated_select from airflow.api_fastapi.common.db.task_instances import eager_load_TI_and_TIH_for_validation @@ -96,7 +97,6 @@ ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag -from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version from airflow.api_fastapi.core_api.services.public.task_instances import ( BulkTaskInstanceService, _get_task_group_task_instances, diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py index e016cb908c41f..37d3e5e08dfed 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py @@ -35,8 +35,6 @@ BulkUpdateAction, T, ) -from airflow.configuration import conf -from airflow.models.serialized_dag import SerializedDagModel class BulkService(Generic[T], ABC): @@ -122,27 +120,3 @@ def apply_patch_with_update_mask( setattr(model, key, value) return model - - -def resolve_run_on_latest_version( - explicit_value: bool | None, - dag_id: str, - session: Session, - fallback: bool = False, -) -> bool: - """ - Resolve run_on_latest_version using precedence: explicit > DAG-level > global config > fallback. - - :param explicit_value: Value from the API request body (or None if not specified). - :param dag_id: The DAG ID to look up. - :param session: Database session. - :param fallback: Default to use when neither DAG-level nor global config is set. - Clear/rerun endpoints use False (the historical default). - Backfill endpoint uses True (the historical default for backfills). - """ - if explicit_value is not None: - return explicit_value - serialized = SerializedDagModel.get_dag(dag_id, session=session) - if serialized and serialized.rerun_with_latest_version is not None: - return serialized.rerun_with_latest_version - return conf.getboolean("core", "rerun_with_latest_version", fallback=fallback) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py index c473b908b0eca..ea9d3ca5ce25a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py @@ -35,7 +35,12 @@ set_dag_run_state_to_success, ) from airflow.api_fastapi.auth.managers.models.base_user import BaseUser -from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, get_latest_version_of_dag +from airflow.api_fastapi.common.dagbag import ( + DagBagDep, + get_dag_for_run, + get_latest_version_of_dag, + resolve_run_on_latest_version, +) from airflow.api_fastapi.common.db.task_instances import eager_load_TI_and_TIH_for_validation from airflow.api_fastapi.core_api.datamodels.common import ( BulkActionNotOnExistence, @@ -47,7 +52,7 @@ ) from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody, DagRunMutableStates from airflow.api_fastapi.core_api.datamodels.task_instances import NewTaskResponse -from airflow.api_fastapi.core_api.services.public.common import BulkService, resolve_run_on_latest_version +from airflow.api_fastapi.core_api.services.public.common import BulkService from airflow.listeners.listener import get_listener_manager from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py index 1092369e913bc..31aeb96c63e72 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py @@ -26,7 +26,7 @@ from sqlalchemy.exc import NoResultFound from airflow.api.common.trigger_dag import trigger_dag -from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run +from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, resolve_run_on_latest_version from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.common.types import UtcDateTime from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT @@ -207,7 +207,8 @@ def clear_dag_run( ) dag = get_dag_for_run(dag_bag, dag_run=dag_run, session=session) - dag.clear(run_id=run_id) + resolved_run_on_latest = resolve_run_on_latest_version(None, dag_id, session) + dag.clear(run_id=run_id, run_on_latest_version=resolved_run_on_latest) @router.get( diff --git a/airflow-core/src/airflow/cli/commands/backfill_command.py b/airflow-core/src/airflow/cli/commands/backfill_command.py index 5ce8cc9224291..d76d435a391a4 100644 --- a/airflow-core/src/airflow/cli/commands/backfill_command.py +++ b/airflow-core/src/airflow/cli/commands/backfill_command.py @@ -24,7 +24,7 @@ from tabulate import tabulate from airflow import settings -from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version +from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version from airflow.cli.simple_table import AirflowConsole from airflow.exceptions import AirflowConfigException from airflow.models.backfill import ReprocessBehavior, _create_backfill, _do_dry_run diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index ad11f764d4839..d953e950dea6a 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -32,8 +32,8 @@ from airflow._shared.timezones import timezone from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity, DagDetails from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser +from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse -from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version from airflow.exceptions import ParamValidationError from airflow.models import DagModel, DagRun, Log from airflow.models.asset import AssetEvent, AssetModel diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index 5d3c37c9b521d..fbe7cc56e4a08 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -17,6 +17,8 @@ from __future__ import annotations +from unittest import mock + import pytest import time_machine from fastapi import Request @@ -28,6 +30,7 @@ from airflow.models import DagModel from airflow.models.dagrun import DagRun from airflow.providers.standard.operators.empty import EmptyOperator +from airflow.serialization.definitions.dag import SerializedDAG from airflow.timetables.trigger import CronPartitionTimetable from airflow.utils.state import DagRunState, State from airflow.utils.types import DagRunType @@ -381,6 +384,34 @@ def test_dag_run_not_found(self, client): assert response.status_code == 404 + def test_dag_run_clear_invokes_resolver(self, client, session, dag_maker): + """Clearing resolves run_on_latest_version (no explicit override) and forwards it to dag.clear.""" + dag_id = "test_clear_invokes_resolver" + run_id = "test_run_id" + + with dag_maker(dag_id=dag_id, session=session, serialized=True): + EmptyOperator(task_id="test_task") + dag_maker.create_dagrun(run_id=run_id, state=DagRunState.SUCCESS) + session.commit() + + with ( + mock.patch( + "airflow.api_fastapi.execution_api.routes.dag_runs.resolve_run_on_latest_version", + return_value=mock.sentinel.resolved, + ) as mock_resolver, + mock.patch.object(SerializedDAG, "clear", autospec=True) as mock_clear, + ): + response = client.post(f"/execution/dag-runs/{dag_id}/{run_id}/clear") + + assert response.status_code == 204 + mock_resolver.assert_called_once() + # First positional arg is the explicit override; operator does not pass one. + assert mock_resolver.call_args.args[0] is None + # The resolved value must reach dag.clear, not be silently dropped. + mock_clear.assert_called_once() + assert mock_clear.call_args.kwargs["run_id"] == run_id + assert mock_clear.call_args.kwargs["run_on_latest_version"] is mock.sentinel.resolved + class TestDagRunDetail: def setup_method(self):