Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions airflow-core/src/airflow/api_fastapi/common/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from airflow.configuration import conf
from airflow.models.dagbag import DBDagBag
from airflow.models.serialized_dag import SerializedDagModel

if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -115,4 +116,28 @@ def get_dag_for_run_or_latest_version(
return dag


def resolve_run_on_latest_version(
explicit_value: bool | None,
dag_id: str,
session: Session,
fallback: bool = False,
) -> bool:
"""
Resolve run_on_latest_version using precedence: explicit > DAG-level > global config > fallback.

:param explicit_value: Value from the API request body (or None if not specified).
:param dag_id: The DAG ID to look up.
:param session: Database session.
:param fallback: Default to use when neither DAG-level nor global config is set.
Clear/rerun endpoints use False (the historical default).
Backfill endpoint uses True (the historical default for backfills).
"""
if explicit_value is not None:
return explicit_value
serialized = SerializedDagModel.get_dag(dag_id, session=session)
if serialized and serialized.rerun_with_latest_version is not None:
return serialized.rerun_with_latest_version
return conf.getboolean("core", "rerun_with_latest_version", fallback=fallback)


DagBagDep = Annotated[DBDagBag, Depends(dag_bag_from_app)]
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from sqlalchemy.orm import joinedload

from airflow._shared.timezones import timezone
from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version
from airflow.api_fastapi.common.db.common import (
SessionDep,
paginated_select,
Expand All @@ -42,7 +43,6 @@
create_openapi_http_exception_doc,
)
from airflow.api_fastapi.core_api.security import GetUserDep, requires_access_backfill
from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import DagNotFound, DagRunTypeNotAllowed
from airflow.models import DagRun
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
get_dag_for_run,
get_dag_for_run_or_latest_version,
get_latest_version_of_dag,
resolve_run_on_latest_version,
)
from airflow.api_fastapi.common.db.common import SessionDep, apply_filters_to_select, paginated_select
from airflow.api_fastapi.common.db.task_instances import eager_load_TI_and_TIH_for_validation
Expand Down Expand Up @@ -96,7 +97,6 @@
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag
from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version
from airflow.api_fastapi.core_api.services.public.task_instances import (
BulkTaskInstanceService,
_get_task_group_task_instances,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
BulkUpdateAction,
T,
)
from airflow.configuration import conf
from airflow.models.serialized_dag import SerializedDagModel


class BulkService(Generic[T], ABC):
Expand Down Expand Up @@ -122,27 +120,3 @@ def apply_patch_with_update_mask(
setattr(model, key, value)

return model


def resolve_run_on_latest_version(
explicit_value: bool | None,
dag_id: str,
session: Session,
fallback: bool = False,
) -> bool:
"""
Resolve run_on_latest_version using precedence: explicit > DAG-level > global config > fallback.

:param explicit_value: Value from the API request body (or None if not specified).
:param dag_id: The DAG ID to look up.
:param session: Database session.
:param fallback: Default to use when neither DAG-level nor global config is set.
Clear/rerun endpoints use False (the historical default).
Backfill endpoint uses True (the historical default for backfills).
"""
if explicit_value is not None:
return explicit_value
serialized = SerializedDagModel.get_dag(dag_id, session=session)
if serialized and serialized.rerun_with_latest_version is not None:
return serialized.rerun_with_latest_version
return conf.getboolean("core", "rerun_with_latest_version", fallback=fallback)
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@
set_dag_run_state_to_success,
)
from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, get_latest_version_of_dag
from airflow.api_fastapi.common.dagbag import (
DagBagDep,
get_dag_for_run,
get_latest_version_of_dag,
resolve_run_on_latest_version,
)
from airflow.api_fastapi.common.db.task_instances import eager_load_TI_and_TIH_for_validation
from airflow.api_fastapi.core_api.datamodels.common import (
BulkActionNotOnExistence,
Expand All @@ -47,7 +52,7 @@
)
from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody, DagRunMutableStates
from airflow.api_fastapi.core_api.datamodels.task_instances import NewTaskResponse
from airflow.api_fastapi.core_api.services.public.common import BulkService, resolve_run_on_latest_version
from airflow.api_fastapi.core_api.services.public.common import BulkService
from airflow.listeners.listener import get_listener_manager
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from sqlalchemy.exc import NoResultFound

from airflow.api.common.trigger_dag import trigger_dag
from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run
from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, resolve_run_on_latest_version
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.types import UtcDateTime
from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT
Expand Down Expand Up @@ -207,7 +207,8 @@ def clear_dag_run(
)
dag = get_dag_for_run(dag_bag, dag_run=dag_run, session=session)

dag.clear(run_id=run_id)
resolved_run_on_latest = resolve_run_on_latest_version(None, dag_id, session)
dag.clear(run_id=run_id, run_on_latest_version=resolved_run_on_latest)


@router.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from tabulate import tabulate

from airflow import settings
from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version
from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version
from airflow.cli.simple_table import AirflowConsole
from airflow.exceptions import AirflowConfigException
from airflow.models.backfill import ReprocessBehavior, _create_backfill, _do_dry_run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from airflow._shared.timezones import timezone
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity, DagDetails
from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version
from airflow.exceptions import ParamValidationError
from airflow.models import DagModel, DagRun, Log
from airflow.models.asset import AssetEvent, AssetModel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from __future__ import annotations

from unittest import mock

import pytest
import time_machine
from fastapi import Request
Expand All @@ -28,6 +30,7 @@
from airflow.models import DagModel
from airflow.models.dagrun import DagRun
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.serialization.definitions.dag import SerializedDAG
from airflow.timetables.trigger import CronPartitionTimetable
from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -381,6 +384,34 @@ def test_dag_run_not_found(self, client):

assert response.status_code == 404

def test_dag_run_clear_invokes_resolver(self, client, session, dag_maker):
"""Clearing resolves run_on_latest_version (no explicit override) and forwards it to dag.clear."""
dag_id = "test_clear_invokes_resolver"
run_id = "test_run_id"

with dag_maker(dag_id=dag_id, session=session, serialized=True):
EmptyOperator(task_id="test_task")
dag_maker.create_dagrun(run_id=run_id, state=DagRunState.SUCCESS)
session.commit()

with (
mock.patch(
"airflow.api_fastapi.execution_api.routes.dag_runs.resolve_run_on_latest_version",
return_value=mock.sentinel.resolved,
) as mock_resolver,
mock.patch.object(SerializedDAG, "clear", autospec=True) as mock_clear,
):
response = client.post(f"/execution/dag-runs/{dag_id}/{run_id}/clear")

assert response.status_code == 204
mock_resolver.assert_called_once()
Comment thread
kaxil marked this conversation as resolved.
# First positional arg is the explicit override; operator does not pass one.
assert mock_resolver.call_args.args[0] is None
# The resolved value must reach dag.clear, not be silently dropped.
mock_clear.assert_called_once()
assert mock_clear.call_args.kwargs["run_id"] == run_id
assert mock_clear.call_args.kwargs["run_on_latest_version"] is mock.sentinel.resolved


class TestDagRunDetail:
def setup_method(self):
Expand Down
Loading