From b6c6425d569ff818ff74715a74c51de94f779421 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Fri, 29 May 2026 11:54:58 +0200 Subject: [PATCH 1/7] API: Add POST /dags/{dag_id}/clearDagRuns bulk endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors ``POST /dags/{dag_id}/clearTaskInstances`` for Dag Runs: a single round-trip clears N runs and (optionally) attaches a note in the same transaction, removing the per-run fan-out the UI does today. The URL sits at the parent-Dag level (``/clearDagRuns`` not ``/dagRuns/clear``) on purpose, to match the existing ``clearTaskInstances`` convention. To register at that prefix without collapsing into ``task_instances_router``, ``dag_run.py`` gains a sibling router ``dag_run_at_dag_router`` with prefix ``/dags/{dag_id}``. URL ``dag_id`` is concrete or ``~``. With ``~``, every entry in ``dag_runs`` must carry its own ``dag_id``. With a specific Dag, entries may omit ``dag_id`` and inherit from the URL (and the route rejects mismatches with 400). Duplicate ``(dag_id, run_id)`` entries collapse to one operation, matching ``BulkDagRunService``'s ``handle_bulk_delete`` semantics. ``dry_run`` is the safe default — returns the union of affected task instances across the listed runs (or the ``NewTaskResponse`` placeholders for the ``only_new`` path) without touching state. Real clear returns a ``DAGRunCollectionResponse`` with the post-clear runs. To keep both endpoints in sync, the per-run lookup, dry-run TI computation, and the clear+note step are pulled out of the single ``clear_dag_run`` route into ``services/public/dag_run.py`` as ``get_dag_run_and_dag_for_clear`` / ``dry_run_clear_dag_run`` / ``perform_clear_dag_run``. The single-run route now composes them instead of duplicating the logic. Auth uses a new ``requires_access_dag_run_clear_bulk`` dependency modelled on ``requires_access_dag_run_bulk`` — same per-Dag team caching, same wildcard-then-400 contract. --- .../core_api/datamodels/dag_run.py | 26 ++- .../openapi/v2-rest-api-generated.yaml | 123 +++++++++++- .../core_api/routes/public/__init__.py | 3 +- .../core_api/routes/public/dag_run.py | 177 +++++++++++------- .../airflow/api_fastapi/core_api/security.py | 39 +++- .../core_api/services/public/dag_run.py | 106 ++++++++++- .../airflow/ui/openapi-gen/queries/common.ts | 1 + .../airflow/ui/openapi-gen/queries/queries.ts | 24 ++- .../ui/openapi-gen/requests/schemas.gen.ts | 60 +++++- .../ui/openapi-gen/requests/services.gen.ts | 36 +++- .../ui/openapi-gen/requests/types.gen.ts | 58 +++++- .../core_api/routes/public/test_dag_run.py | 160 ++++++++++++++++ .../airflowctl/api/datamodels/generated.py | 38 +++- 13 files changed, 756 insertions(+), 95 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 8373847328d34..2cfc4a0001738 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -57,8 +57,8 @@ class BulkDAGRunBody(StrictBaseModel): dag_id: str | None = None -class DAGRunClearBody(StrictBaseModel): - """Dag Run serializer for clear endpoint body.""" +class BaseDAGRunClear(StrictBaseModel): + """Shared options for the single-run and bulk Dag Run clear endpoints.""" dry_run: bool = True only_failed: bool = False @@ -68,25 +68,31 @@ class DAGRunClearBody(StrictBaseModel): ) run_on_latest_version: bool | None = Field( default=None, - description="(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run. " + description="(Experimental) Run on the latest bundle version of the Dag after clearing. " "If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, " "then the ``[core] rerun_with_latest_version`` config option, " - "and finally ``False`` (the historical default for clear/rerun).", - ) - note: str | None = Field( - default=None, - max_length=1000, + "and finally ``False``.", ) + note: str | None = Field(default=None, max_length=1000) @model_validator(mode="before") @classmethod - def validate_model(cls, data: Any) -> Any: - """Validate clear Dag run form.""" + def validate_only_new_only_failed_mutually_exclusive(cls, data: Any) -> Any: if data.get("only_new") and data.get("only_failed"): raise ValueError("only_new and only_failed are mutually exclusive") return data +class DAGRunClearBody(BaseDAGRunClear): + """Dag Run serializer for clear endpoint body.""" + + +class BulkDAGRunClearBody(BaseDAGRunClear): + """Request body for the bulk clear Dag Runs endpoint.""" + + dag_runs: list[BulkDAGRunBody] = Field(min_length=1) + + class DAGRunResponse(BaseModel): """Dag Run serializer for responses.""" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5869ad434a54b..da4727be51ac8 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2777,6 +2777,80 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/clearDagRuns: + post: + tags: + - DagRun + summary: Bulk Clear Dag Runs + description: 'Clear multiple Dag Runs in a single request. + + + When the URL ``dag_id`` is ``~``, every entry in ``dag_runs`` must + + carry its own ``dag_id``. When the URL ``dag_id`` is a specific Dag, + + entries may omit ``dag_id`` and inherit from the URL. ``dry_run`` + + returns the union of affected task instances across the listed runs + + without mutating state; the real clear returns the post-clear Dag Runs.' + operationId: bulk_clear_dag_runs + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/BulkDAGRunClearBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + anyOf: + - $ref: '#/components/schemas/ClearTaskInstanceCollectionResponse' + - $ref: '#/components/schemas/DAGRunCollectionResponse' + title: Response Bulk Clear Dag Runs + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear: post: tags: @@ -11883,6 +11957,49 @@ components: - dag_run_id title: BulkDAGRunBody description: Request body for bulk delete operations on Dag Runs. + BulkDAGRunClearBody: + properties: + dry_run: + type: boolean + title: Dry Run + default: true + only_failed: + type: boolean + title: Only Failed + default: false + only_new: + type: boolean + title: Only New + description: Only queue newly added tasks in the latest Dag version without + clearing existing tasks. + default: false + run_on_latest_version: + anyOf: + - type: boolean + - type: 'null' + title: Run On Latest Version + description: (Experimental) Run on the latest bundle version of the Dag + after clearing. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` + parameter, then the ``[core] rerun_with_latest_version`` config option, + and finally ``False``. + note: + anyOf: + - type: string + maxLength: 1000 + - type: 'null' + title: Note + dag_runs: + items: + $ref: '#/components/schemas/BulkDAGRunBody' + type: array + minItems: 1 + title: Dag Runs + additionalProperties: false + type: object + required: + - dag_runs + title: BulkDAGRunClearBody + description: Request body for the bulk clear Dag Runs endpoint. BulkDeleteAction_BulkDAGRunBody_: properties: action: @@ -13093,9 +13210,9 @@ components: - type: 'null' title: Run On Latest Version description: (Experimental) Run on the latest bundle version of the Dag - after clearing the Dag Run. If not specified, falls back to the DAG-level - ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` - config option, and finally ``False`` (the historical default for clear/rerun). + after clearing. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` + parameter, then the ``[core] rerun_with_latest_version`` config option, + and finally ``False``. note: anyOf: - type: string diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py index 4e6ebba117837..e305f0ac248e4 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py @@ -28,7 +28,7 @@ from airflow.api_fastapi.core_api.routes.public.config import config_router from airflow.api_fastapi.core_api.routes.public.connections import connections_router from airflow.api_fastapi.core_api.routes.public.dag_parsing import dag_parsing_router -from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router +from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_at_dag_router, dag_run_router from airflow.api_fastapi.core_api.routes.public.dag_sources import dag_sources_router from airflow.api_fastapi.core_api.routes.public.dag_stats import dag_stats_router from airflow.api_fastapi.core_api.routes.public.dag_tags import dag_tags_router @@ -69,6 +69,7 @@ authenticated_router.include_router(backfills_router) authenticated_router.include_router(connections_router) authenticated_router.include_router(dag_run_router) +authenticated_router.include_router(dag_run_at_dag_router) authenticated_router.include_router(dag_sources_router) authenticated_router.include_router(dag_stats_router) authenticated_router.include_router(config_router) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index aa82b14fefd58..a85d039e7ef71 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -47,7 +47,6 @@ attach_dag_versions_to_runs, eager_load_dag_run_for_list, ) -from airflow.api_fastapi.common.db.task_instances import eager_load_TI_and_TIH_for_validation from airflow.api_fastapi.common.parameters import ( FilterOptionEnum, FilterParam, @@ -79,6 +78,7 @@ from airflow.api_fastapi.core_api.datamodels.common import BulkBody, BulkResponse from airflow.api_fastapi.core_api.datamodels.dag_run import ( BulkDAGRunBody, + BulkDAGRunClearBody, DAGRunClearBody, DAGRunCollectionResponse, DagRunMutableStates, @@ -99,22 +99,28 @@ requires_access_asset, requires_access_dag, requires_access_dag_run_bulk, + requires_access_dag_run_clear_bulk, +) +from airflow.api_fastapi.core_api.services.public.dag_run import ( + BulkDagRunService, + DagRunWaiter, + dry_run_clear_dag_run, + get_dag_run_and_dag_for_clear, + perform_clear_dag_run, ) -from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version -from airflow.api_fastapi.core_api.services.public.dag_run import BulkDagRunService, DagRunWaiter from airflow.api_fastapi.logging.decorators import action_logging from airflow.exceptions import ParamValidationError from airflow.listeners.listener import get_listener_manager from airflow.models import DagModel, DagRun from airflow.models.asset import AssetEvent from airflow.models.dag_version import DagVersion -from airflow.models.taskinstance import TaskInstance -from airflow.utils.state import DagRunState, TaskInstanceState +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType log = structlog.get_logger(__name__) dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns") +dag_run_at_dag_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}") @dag_run_router.get( @@ -321,81 +327,118 @@ def clear_dag_run( session: SessionDep, user: GetUserDep, ) -> ClearTaskInstanceCollectionResponse | DAGRunResponse: - dag_run = session.scalar( - select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id).options(joinedload(DagRun.dag_model)) + dag_run, dag = get_dag_run_and_dag_for_clear( + session=session, dag_bag=dag_bag, dag_id=dag_id, dag_run_id=dag_run_id ) - if dag_run is None: - raise HTTPException( - status.HTTP_404_NOT_FOUND, - f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", + + if body.dry_run: + task_instances = dry_run_clear_dag_run( + session=session, + dag_bag=dag_bag, + dag_id=dag_id, + dag_run_id=dag_run_id, + only_failed=body.only_failed, + only_new=body.only_new, + ) + return ClearTaskInstanceCollectionResponse( + task_instances=task_instances, + total_entries=len(task_instances), ) - dag = dag_bag.get_dag_for_run(dag_run, session=session) + return perform_clear_dag_run( + session=session, + dag=dag, + dag_run=dag_run, + dag_id=dag_id, + only_failed=body.only_failed, + only_new=body.only_new, + run_on_latest_version=body.run_on_latest_version, + note=body.note, + user=user, + ) - if not dag: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") - resolved_run_on_latest = resolve_run_on_latest_version(body.run_on_latest_version, dag_id, session) +@dag_run_at_dag_router.post( + "/clearDagRuns", + responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_dag_run_clear_bulk()), Depends(action_logging())], +) +def bulk_clear_dag_runs( + dag_id: str, + body: BulkDAGRunClearBody, + dag_bag: DagBagDep, + session: SessionDep, + user: GetUserDep, +) -> ClearTaskInstanceCollectionResponse | DAGRunCollectionResponse: + """Clear multiple Dag Runs in a single request.""" + url_dag_id_is_wildcard = dag_id == "~" + + seen_targets: set[tuple[str, str]] = set() + resolved_targets: list[tuple[str, str]] = [] + for run in body.dag_runs: + if url_dag_id_is_wildcard: + if not run.dag_id or run.dag_id == "~": + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"When the URL dag_id is '~', every entry must provide a concrete dag_id " + f"(missing on dag_run_id: {run.dag_run_id!r}).", + ) + target = (run.dag_id, run.dag_run_id) + else: + entity_dag_id = run.dag_id or dag_id + if entity_dag_id != dag_id: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Entry dag_id {entity_dag_id!r} does not match the URL dag_id {dag_id!r}.", + ) + target = (dag_id, run.dag_run_id) + if target not in seen_targets: + seen_targets.add(target) + resolved_targets.append(target) if body.dry_run: - if body.only_new: - # Determine "new" tasks by TI existence: a task is new when the latest Dag - # version contains it but the current run has no TaskInstance row for it yet. - # This is more reliable than the version-comparison approach used by - # dag.clear(only_new=True, dry_run=True) which returns an empty set when - # created_dag_version_id is None (e.g. LocalDagBundle). - latest_dag = get_latest_version_of_dag(dag_bag, dag_id, session) - existing_task_ids = set( - session.scalars( - select(TaskInstance.task_id).where( - TaskInstance.dag_id == dag_id, - TaskInstance.run_id == dag_run_id, - ) - ).all() + affected: list[TaskInstanceResponse | NewTaskResponse] = [] + for run_dag_id, run_id in resolved_targets: + get_dag_run_and_dag_for_clear( + session=session, dag_bag=dag_bag, dag_id=run_dag_id, dag_run_id=run_id ) - new_task_ids = sorted(set(latest_dag.task_ids) - existing_task_ids) - task_instances: list[TaskInstanceResponse | NewTaskResponse] = [ - NewTaskResponse(task_id=task_id, task_display_name=task_id) for task_id in new_task_ids - ] - else: - # Query task instances directly with proper eager loading so that all - # relationships required by TaskInstanceResponse (dag_run, dag_model, - # dag_version, rendered_task_instance_fields) are populated. - # dag.clear(dry_run=True) returns raw ORM objects without these joins. - ti_query = eager_load_TI_and_TIH_for_validation(select(TaskInstance)) - ti_query = ti_query.where( - TaskInstance.dag_id == dag_id, - TaskInstance.run_id == dag_run_id, - ) - if body.only_failed: - ti_query = ti_query.where( - TaskInstance.state.in_([TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]) + affected.extend( + dry_run_clear_dag_run( + session=session, + dag_bag=dag_bag, + dag_id=run_dag_id, + dag_run_id=run_id, + only_failed=body.only_failed, + only_new=body.only_new, ) - task_instances = list(session.scalars(ti_query)) - + ) return ClearTaskInstanceCollectionResponse( - task_instances=task_instances, - total_entries=len(task_instances), + task_instances=affected, + total_entries=len(affected), ) - dag.clear( - run_id=dag_run_id, - task_ids=None, - only_new=body.only_new, - only_failed=body.only_failed, - run_on_latest_version=resolved_run_on_latest, - session=session, + cleared_runs: list[DagRun] = [] + for run_dag_id, run_id in resolved_targets: + dag_run, dag = get_dag_run_and_dag_for_clear( + session=session, dag_bag=dag_bag, dag_id=run_dag_id, dag_run_id=run_id + ) + cleared_runs.append( + perform_clear_dag_run( + session=session, + dag=dag, + dag_run=dag_run, + dag_id=run_dag_id, + only_failed=body.only_failed, + only_new=body.only_new, + run_on_latest_version=body.run_on_latest_version, + note=body.note, + user=user, + ) + ) + return DAGRunCollectionResponse( + dag_runs=cleared_runs, + total_entries=len(cleared_runs), ) - dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id)) - if not dag_run_cleared: - raise HTTPException(status.HTTP_404_NOT_FOUND, "Dag run not found after clearing") - if body.note is not None: - if dag_run_cleared.dag_run_note is None: - dag_run_cleared.note = (body.note, user.get_id()) - else: - dag_run_cleared.dag_run_note.content = body.note - dag_run_cleared.dag_run_note.user_id = user.get_id() - return dag_run_cleared @dag_run_router.get( diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py b/airflow-core/src/airflow/api_fastapi/core_api/security.py index bb3413b7f05a5..d0e947b8f5f4d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/security.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py @@ -63,7 +63,7 @@ BulkUpdateAction, ) from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody -from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody +from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody, BulkDAGRunClearBody from airflow.api_fastapi.core_api.datamodels.pools import PoolBody from airflow.api_fastapi.core_api.datamodels.variables import VariableBody from airflow.configuration import conf @@ -780,6 +780,43 @@ def inner( return inner +def requires_access_dag_run_clear_bulk() -> Callable[[BulkDAGRunClearBody, BaseUser, str], None]: + def inner( + body: BulkDAGRunClearBody, + user: GetUserDep, + dag_id: str, + ) -> None: + resolved_dag_ids: set[str] = set() + for run in body.dag_runs: + entity_dag_id = run.dag_id or dag_id + if entity_dag_id and entity_dag_id != "~": + resolved_dag_ids.add(entity_dag_id) + + dag_id_to_team = {d: DagModel.get_team_name(d) for d in resolved_dag_ids} + + requests: list[IsAuthorizedDagRequest] = [] + for run in body.dag_runs: + entity_dag_id = run.dag_id or dag_id + if not entity_dag_id or entity_dag_id == "~": + continue + requests.append( + { + "method": "PUT", + "access_entity": DagAccessEntity.RUN, + "details": DagDetails(id=entity_dag_id, team_name=dag_id_to_team.get(entity_dag_id)), + } + ) + + _requires_access( + is_authorized_callback=lambda: get_auth_manager().batch_is_authorized_dag( + requests=requests, + user=user, + ) + ) + + return inner + + def requires_access_asset(method: ResourceMethod) -> Callable[[Request, BaseUser], None]: def inner( request: Request, diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py index 4c5a5b090b64a..3c9d8cd7eba72 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py @@ -25,10 +25,13 @@ import attrs import structlog -from fastapi import status +from fastapi import HTTPException, status from sqlalchemy import select, tuple_ -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session, joinedload +from airflow.api_fastapi.auth.managers.models.base_user import BaseUser +from airflow.api_fastapi.common.dagbag import DagBagDep, get_latest_version_of_dag +from airflow.api_fastapi.common.db.task_instances import eager_load_TI_and_TIH_for_validation from airflow.api_fastapi.core_api.datamodels.common import ( BulkActionNotOnExistence, BulkActionResponse, @@ -38,18 +41,113 @@ BulkUpdateAction, ) from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody, DagRunMutableStates -from airflow.api_fastapi.core_api.services.public.common import BulkService +from airflow.api_fastapi.core_api.datamodels.task_instances import NewTaskResponse +from airflow.api_fastapi.core_api.services.public.common import BulkService, resolve_run_on_latest_version from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstance from airflow.models.xcom import XCOM_RETURN_KEY, XComModel from airflow.utils.session import create_session_async -from airflow.utils.state import State +from airflow.utils.state import State, TaskInstanceState if TYPE_CHECKING: from collections.abc import AsyncGenerator, Iterator + from airflow.serialization.definitions.dag import SerializedDAG + log = structlog.get_logger(__name__) +def get_dag_run_and_dag_for_clear( + *, + session: Session, + dag_bag: DagBagDep, + dag_id: str, + dag_run_id: str, +) -> tuple[DagRun, SerializedDAG]: + dag_run = session.scalar( + select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id).options(joinedload(DagRun.dag_model)) + ) + if dag_run is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", + ) + dag = dag_bag.get_dag_for_run(dag_run, session=session) + if not dag: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") + return dag_run, dag + + +def dry_run_clear_dag_run( + *, + session: Session, + dag_bag: DagBagDep, + dag_id: str, + dag_run_id: str, + only_failed: bool, + only_new: bool, +) -> list[Any]: + if only_new: + # ``dag.clear(only_new=True, dry_run=True)`` returns nothing when + # ``created_dag_version_id`` is None (e.g. LocalDagBundle), so derive new + # tasks from TI existence instead. + latest_dag = get_latest_version_of_dag(dag_bag, dag_id, session) + existing_task_ids = set( + session.scalars( + select(TaskInstance.task_id).where( + TaskInstance.dag_id == dag_id, + TaskInstance.run_id == dag_run_id, + ) + ).all() + ) + new_task_ids = sorted(set(latest_dag.task_ids) - existing_task_ids) + return [NewTaskResponse(task_id=task_id, task_display_name=task_id) for task_id in new_task_ids] + + ti_query = eager_load_TI_and_TIH_for_validation(select(TaskInstance)) + ti_query = ti_query.where( + TaskInstance.dag_id == dag_id, + TaskInstance.run_id == dag_run_id, + ) + if only_failed: + ti_query = ti_query.where( + TaskInstance.state.in_([TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]) + ) + return list(session.scalars(ti_query)) + + +def perform_clear_dag_run( + *, + session: Session, + dag: SerializedDAG, + dag_run: DagRun, + dag_id: str, + only_failed: bool, + only_new: bool, + run_on_latest_version: bool | None, + note: str | None, + user: BaseUser, +) -> DagRun: + resolved_run_on_latest = resolve_run_on_latest_version(run_on_latest_version, dag_id, session) + dag.clear( + run_id=dag_run.run_id, + task_ids=None, + only_new=only_new, + only_failed=only_failed, + run_on_latest_version=resolved_run_on_latest, + session=session, + ) + dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id)) + if not dag_run_cleared: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Dag run not found after clearing") + if note is not None: + if dag_run_cleared.dag_run_note is None: + dag_run_cleared.note = (note, user.get_id()) + else: + dag_run_cleared.dag_run_note.content = note + dag_run_cleared.dag_run_note.user_id = user.get_id() + return dag_run_cleared + + @attrs.define class DagRunWaiter: """Wait for the specified dag run to finish, and collect info from it.""" diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 632d311157631..58cf4eca1f95b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -1035,6 +1035,7 @@ export type ConnectionServicePostConnectionMutationResult = Awaited>; export type ConnectionServiceCreateDefaultConnectionsMutationResult = Awaited>; export type DagRunServiceTriggerDagRunMutationResult = Awaited>; +export type DagRunServiceBulkClearDagRunsMutationResult = Awaited>; export type DagRunServiceClearDagRunMutationResult = Awaited>; export type DagRunServiceGetListDagRunsBatchMutationResult = Awaited>; export type DagServiceFavoriteDagMutationResult = Awaited>; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 01e3d78e69cd7..496f69f4cdb44 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -2,7 +2,7 @@ import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from "@tanstack/react-query"; import { AssetService, AssetStateService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TaskStateService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; -import { AssetStateBody, BackfillPostBody, BulkBody_BulkDAGRunBody_, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TaskStateBody, TaskStatePatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; +import { AssetStateBody, BackfillPostBody, BulkBody_BulkDAGRunBody_, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, BulkDAGRunClearBody, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TaskStateBody, TaskStatePatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; import * as Common from "./common"; /** * Get Assets @@ -2176,6 +2176,28 @@ export const useDagRunServiceTriggerDagRun = ({ mutationFn: ({ dagId, requestBody }) => DagRunService.triggerDagRun({ dagId, requestBody }) as unknown as Promise, ...options }); /** +* Bulk Clear Dag Runs +* Clear multiple Dag Runs in a single request. +* +* When the URL ``dag_id`` is ``~``, every entry in ``dag_runs`` must +* carry its own ``dag_id``. When the URL ``dag_id`` is a specific Dag, +* entries may omit ``dag_id`` and inherit from the URL. ``dry_run`` +* returns the union of affected task instances across the listed runs +* without mutating state; the real clear returns the post-clear Dag Runs. +* @param data The data for the request. +* @param data.dagId +* @param data.requestBody +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useDagRunServiceBulkClearDagRuns = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.bulkClearDagRuns({ dagId, requestBody }) as unknown as Promise, ...options }); +/** * Clear Dag Run * @param data The data for the request. * @param data.dagId diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 28e04804ca913..2c307f9cf9642 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -971,6 +971,64 @@ export const $BulkDAGRunBody = { description: 'Request body for bulk delete operations on Dag Runs.' } as const; +export const $BulkDAGRunClearBody = { + properties: { + dry_run: { + type: 'boolean', + title: 'Dry Run', + default: true + }, + only_failed: { + type: 'boolean', + title: 'Only Failed', + default: false + }, + only_new: { + type: 'boolean', + title: 'Only New', + description: 'Only queue newly added tasks in the latest Dag version without clearing existing tasks.', + default: false + }, + run_on_latest_version: { + anyOf: [ + { + type: 'boolean' + }, + { + type: 'null' + } + ], + title: 'Run On Latest Version', + description: '(Experimental) Run on the latest bundle version of the Dag after clearing. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False``.' + }, + note: { + anyOf: [ + { + type: 'string', + maxLength: 1000 + }, + { + type: 'null' + } + ], + title: 'Note' + }, + dag_runs: { + items: { + '$ref': '#/components/schemas/BulkDAGRunBody' + }, + type: 'array', + minItems: 1, + title: 'Dag Runs' + } + }, + additionalProperties: false, + type: 'object', + required: ['dag_runs'], + title: 'BulkDAGRunClearBody', + description: 'Request body for the bulk clear Dag Runs endpoint.' +} as const; + export const $BulkDeleteAction_BulkDAGRunBody_ = { properties: { action: { @@ -2774,7 +2832,7 @@ export const $DAGRunClearBody = { } ], title: 'Run On Latest Version', - description: '(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False`` (the historical default for clear/rerun).' + description: '(Experimental) Run on the latest bundle version of the Dag after clearing. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False``.' }, note: { anyOf: [ diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 3e65983530399..3731b3c3df962 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStatesData, ListAssetStatesResponse, ClearAssetStateData, ClearAssetStateResponse, GetAssetStateData, GetAssetStateResponse, SetAssetStateData, SetAssetStateResponse, DeleteAssetStateData, DeleteAssetStateResponse, ListTaskStatesData, ListTaskStatesResponse, ClearTaskStateData, ClearTaskStateResponse, GetTaskStateData, GetTaskStateResponse, SetTaskStateData, SetTaskStateResponse, PatchTaskStateData, PatchTaskStateResponse, DeleteTaskStateData, DeleteTaskStateResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, BulkClearDagRunsData, BulkClearDagRunsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStatesData, ListAssetStatesResponse, ClearAssetStateData, ClearAssetStateResponse, GetAssetStateData, GetAssetStateResponse, SetAssetStateData, SetAssetStateResponse, DeleteAssetStateData, DeleteAssetStateResponse, ListTaskStatesData, ListTaskStatesResponse, ClearTaskStateData, ClearTaskStateResponse, GetTaskStateData, GetTaskStateResponse, SetTaskStateData, SetTaskStateResponse, PatchTaskStateData, PatchTaskStateResponse, DeleteTaskStateData, DeleteTaskStateResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; export class AssetService { /** @@ -1146,6 +1146,40 @@ export class DagRunService { }); } + /** + * Bulk Clear Dag Runs + * Clear multiple Dag Runs in a single request. + * + * When the URL ``dag_id`` is ``~``, every entry in ``dag_runs`` must + * carry its own ``dag_id``. When the URL ``dag_id`` is a specific Dag, + * entries may omit ``dag_id`` and inherit from the URL. ``dry_run`` + * returns the union of affected task instances across the listed runs + * without mutating state; the real clear returns the post-clear Dag Runs. + * @param data The data for the request. + * @param data.dagId + * @param data.requestBody + * @returns unknown Successful Response + * @throws ApiError + */ + public static bulkClearDagRuns(data: BulkClearDagRunsData): CancelablePromise { + return __request(OpenAPI, { + method: 'POST', + url: '/api/v2/dags/{dag_id}/clearDagRuns', + path: { + dag_id: data.dagId + }, + body: data.requestBody, + mediaType: 'application/json', + errors: { + 400: 'Bad Request', + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + /** * Clear Dag Run * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 25a5fc6e76df4..9e6b0ebede751 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -301,6 +301,24 @@ export type BulkDAGRunBody = { dag_id?: string | null; }; +/** + * Request body for the bulk clear Dag Runs endpoint. + */ +export type BulkDAGRunClearBody = { + dry_run?: boolean; + only_failed?: boolean; + /** + * Only queue newly added tasks in the latest Dag version without clearing existing tasks. + */ + only_new?: boolean; + /** + * (Experimental) Run on the latest bundle version of the Dag after clearing. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False``. + */ + run_on_latest_version?: boolean | null; + note?: string | null; + dag_runs: Array; +}; + export type BulkDeleteAction_BulkDAGRunBody_ = { /** * The action to be performed on the entities. @@ -748,7 +766,7 @@ export type DAGRunClearBody = { */ only_new?: boolean; /** - * (Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False`` (the historical default for clear/rerun). + * (Experimental) Run on the latest bundle version of the Dag after clearing. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False``. */ run_on_latest_version?: boolean | null; note?: string | null; @@ -2924,6 +2942,13 @@ export type GetUpstreamAssetEventsData = { export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse; +export type BulkClearDagRunsData = { + dagId: string; + requestBody: BulkDAGRunClearBody; +}; + +export type BulkClearDagRunsResponse = ClearTaskInstanceCollectionResponse | DAGRunCollectionResponse; + export type ClearDagRunData = { dagId: string; dagRunId: string; @@ -5395,6 +5420,37 @@ export type $OpenApiTs = { }; }; }; + '/api/v2/dags/{dag_id}/clearDagRuns': { + post: { + req: BulkClearDagRunsData; + res: { + /** + * Successful Response + */ + 200: ClearTaskInstanceCollectionResponse | DAGRunCollectionResponse; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': { post: { req: ClearDagRunData; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 318544014e3c1..9bbb5d066d357 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1882,6 +1882,166 @@ def test_clear_dag_run_only_new_and_only_failed_mutually_exclusive(self, test_cl assert response.status_code == 422 +class TestBulkClearDagRuns: + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_specific_dag(self, test_client, session): + """Specific dag_id in URL, dag_run_id in body — clears both runs and queues them.""" + response = test_client.post( + f"/dags/{DAG1_ID}/clearDagRuns", + json={ + "dry_run": False, + "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}, {"dag_run_id": DAG1_RUN2_ID}], + }, + ) + assert response.status_code == 200 + body = response.json() + assert body["total_entries"] == 2 + returned_run_ids = sorted(run["dag_run_id"] for run in body["dag_runs"]) + assert returned_run_ids == sorted([DAG1_RUN1_ID, DAG1_RUN2_ID]) + for run in body["dag_runs"]: + assert run["state"] == "queued" + assert run["dag_id"] == DAG1_ID + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_wildcard_across_dags(self, test_client, session): + """``~`` URL with per-entity dag_id — clears runs across Dags in one call.""" + response = test_client.post( + "/dags/~/clearDagRuns", + json={ + "dry_run": False, + "dag_runs": [ + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID}, + {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID}, + ], + }, + ) + assert response.status_code == 200 + body = response.json() + assert body["total_entries"] == 2 + pairs = sorted((run["dag_id"], run["dag_run_id"]) for run in body["dag_runs"]) + assert pairs == sorted([(DAG1_ID, DAG1_RUN1_ID), (DAG2_ID, DAG2_RUN1_ID)]) + for run in body["dag_runs"]: + assert run["state"] == "queued" + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_dry_run_collects_affected_tis_across_runs(self, test_client, session): + """Dry-run returns the union of affected TIs across the listed runs without mutating state.""" + response = test_client.post( + f"/dags/{DAG1_ID}/clearDagRuns", + json={ + "dry_run": True, + "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}, {"dag_run_id": DAG1_RUN2_ID}], + }, + ) + assert response.status_code == 200 + body = response.json() + # Both DAG1 runs have two task instances each. + assert body["total_entries"] == 4 + run_ids_in_response = {ti["dag_run_id"] for ti in body["task_instances"]} + assert run_ids_in_response == {DAG1_RUN1_ID, DAG1_RUN2_ID} + # No state changes — dry_run never writes. + dag_run = session.scalar( + select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == DAG1_RUN1_ID) + ) + assert dag_run.state == DAG1_RUN1_STATE + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_dry_run_only_failed_filters(self, test_client): + """``only_failed=True`` shrinks the dry-run preview to failed TIs only.""" + response = test_client.post( + f"/dags/{DAG1_ID}/clearDagRuns", + json={ + "dry_run": True, + "only_failed": True, + "dag_runs": [{"dag_run_id": DAG1_RUN2_ID}], + }, + ) + assert response.status_code == 200 + body = response.json() + assert all(ti["state"] == "failed" for ti in body["task_instances"]) + assert body["total_entries"] == 1 + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_applies_note_to_each_run(self, test_client, session): + """``note`` in the body is applied to every cleared run in the same transaction.""" + response = test_client.post( + f"/dags/{DAG1_ID}/clearDagRuns", + json={ + "dry_run": False, + "note": "bulk cleared by test", + "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}, {"dag_run_id": DAG1_RUN2_ID}], + }, + ) + assert response.status_code == 200 + for run_id in (DAG1_RUN1_ID, DAG1_RUN2_ID): + dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == run_id)) + assert dag_run.note == "bulk cleared by test" + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_wildcard_rejects_missing_dag_id(self, test_client): + """``~`` URL requires every entry to carry a concrete dag_id; 400 otherwise.""" + response = test_client.post( + "/dags/~/clearDagRuns", + json={ + "dry_run": False, + "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}], + }, + ) + assert response.status_code == 400 + assert DAG1_RUN1_ID in response.json()["detail"] + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_specific_url_rejects_mismatched_dag_id(self, test_client): + """When the URL has a specific dag_id, mismatched per-entity dag_id is rejected.""" + response = test_client.post( + f"/dags/{DAG1_ID}/clearDagRuns", + json={ + "dry_run": False, + "dag_runs": [{"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID}], + }, + ) + assert response.status_code == 400 + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_missing_run_returns_404(self, test_client): + response = test_client.post( + f"/dags/{DAG1_ID}/clearDagRuns", + json={ + "dry_run": False, + "dag_runs": [{"dag_run_id": "does_not_exist"}], + }, + ) + assert response.status_code == 404 + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_rejects_only_new_with_only_failed(self, test_client): + """``only_new`` and ``only_failed`` are mutually exclusive at the body validator level.""" + response = test_client.post( + f"/dags/{DAG1_ID}/clearDagRuns", + json={ + "dry_run": True, + "only_new": True, + "only_failed": True, + "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}], + }, + ) + assert response.status_code == 422 + + def test_bulk_clear_unauthenticated_returns_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.post( + f"/dags/{DAG1_ID}/clearDagRuns", + json={"dry_run": False, "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}]}, + ) + assert response.status_code == 401 + + def test_bulk_clear_unauthorized_returns_403(self, unauthorized_test_client): + response = unauthorized_test_client.post( + f"/dags/{DAG1_ID}/clearDagRuns", + json={"dry_run": False, "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}]}, + ) + assert response.status_code == 403 + + class TestClearDagRunOnlyNew: """Integration tests for only_new=True using a real two-version DAG. diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index c07e6e999c151..3a5cb8e9997dd 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -120,6 +120,38 @@ class BulkDAGRunBody(BaseModel): dag_id: Annotated[str | None, Field(title="Dag Id")] = None +class Note(RootModel[str]): + root: Annotated[str, Field(max_length=1000, title="Note")] + + +class BulkDAGRunClearBody(BaseModel): + """ + Request body for the bulk clear Dag Runs endpoint. + """ + + model_config = ConfigDict( + extra="forbid", + ) + dry_run: Annotated[bool | None, Field(title="Dry Run")] = True + only_failed: Annotated[bool | None, Field(title="Only Failed")] = False + only_new: Annotated[ + bool | None, + Field( + description="Only queue newly added tasks in the latest Dag version without clearing existing tasks.", + title="Only New", + ), + ] = False + run_on_latest_version: Annotated[ + bool | None, + Field( + description="(Experimental) Run on the latest bundle version of the Dag after clearing. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False``.", + title="Run On Latest Version", + ), + ] = None + note: Annotated[Note | None, Field(title="Note")] = None + dag_runs: Annotated[list[BulkDAGRunBody], Field(min_length=1, title="Dag Runs")] + + class BulkDeleteActionBulkDAGRunBody(BaseModel): model_config = ConfigDict( extra="forbid", @@ -157,10 +189,6 @@ class BulkResponse(BaseModel): ] = None -class Note(RootModel[str]): - root: Annotated[str, Field(max_length=1000, title="Note")] - - class BulkUpdateActionBulkDAGRunBody(BaseModel): model_config = ConfigDict( extra="forbid", @@ -344,7 +372,7 @@ class DAGRunClearBody(BaseModel): run_on_latest_version: Annotated[ bool | None, Field( - description="(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False`` (the historical default for clear/rerun).", + description="(Experimental) Run on the latest bundle version of the Dag after clearing. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False``.", title="Run On Latest Version", ), ] = None From ed3bf8ca0b768566edf7928d541cec57f9ecb3d8 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Mon, 1 Jun 2026 12:22:17 +0200 Subject: [PATCH 2/7] Simplify clearDagRuns dedup with an ordered dict and rename handler --- .../core_api/routes/public/dag_run.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index a85d039e7ef71..18af252c49583 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -363,7 +363,7 @@ def clear_dag_run( responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]), dependencies=[Depends(requires_access_dag_run_clear_bulk()), Depends(action_logging())], ) -def bulk_clear_dag_runs( +def clear_dag_runs( dag_id: str, body: BulkDAGRunClearBody, dag_bag: DagBagDep, @@ -373,8 +373,8 @@ def bulk_clear_dag_runs( """Clear multiple Dag Runs in a single request.""" url_dag_id_is_wildcard = dag_id == "~" - seen_targets: set[tuple[str, str]] = set() - resolved_targets: list[tuple[str, str]] = [] + # No ordered set type in Python, using a dict with throwaway values as replacement. + runs_to_clear: dict[tuple[str, str], None] = {} for run in body.dag_runs: if url_dag_id_is_wildcard: if not run.dag_id or run.dag_id == "~": @@ -383,7 +383,7 @@ def bulk_clear_dag_runs( f"When the URL dag_id is '~', every entry must provide a concrete dag_id " f"(missing on dag_run_id: {run.dag_run_id!r}).", ) - target = (run.dag_id, run.dag_run_id) + run_to_clear = (run.dag_id, run.dag_run_id) else: entity_dag_id = run.dag_id or dag_id if entity_dag_id != dag_id: @@ -391,14 +391,12 @@ def bulk_clear_dag_runs( status.HTTP_400_BAD_REQUEST, f"Entry dag_id {entity_dag_id!r} does not match the URL dag_id {dag_id!r}.", ) - target = (dag_id, run.dag_run_id) - if target not in seen_targets: - seen_targets.add(target) - resolved_targets.append(target) + run_to_clear = (dag_id, run.dag_run_id) + runs_to_clear[run_to_clear] = None if body.dry_run: affected: list[TaskInstanceResponse | NewTaskResponse] = [] - for run_dag_id, run_id in resolved_targets: + for run_dag_id, run_id in runs_to_clear: get_dag_run_and_dag_for_clear( session=session, dag_bag=dag_bag, dag_id=run_dag_id, dag_run_id=run_id ) @@ -418,7 +416,7 @@ def bulk_clear_dag_runs( ) cleared_runs: list[DagRun] = [] - for run_dag_id, run_id in resolved_targets: + for run_dag_id, run_id in runs_to_clear: dag_run, dag = get_dag_run_and_dag_for_clear( session=session, dag_bag=dag_bag, dag_id=run_dag_id, dag_run_id=run_id ) From e090de97d22e2dff3f6a39c6b126afbf1eadd8b0 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Mon, 1 Jun 2026 12:27:11 +0200 Subject: [PATCH 3/7] Regenerate OpenAPI spec and UI client for clearDagRuns rename --- .../openapi/v2-rest-api-generated.yaml | 137 ++++++++---------- .../airflow/ui/openapi-gen/queries/common.ts | 2 +- .../airflow/ui/openapi-gen/queries/queries.ts | 38 ++--- .../ui/openapi-gen/requests/services.gen.ts | 64 ++++---- .../ui/openapi-gen/requests/types.gen.ts | 50 +++---- 5 files changed, 134 insertions(+), 157 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index da4727be51ac8..dc566f040c1f6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2777,80 +2777,6 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /api/v2/dags/{dag_id}/clearDagRuns: - post: - tags: - - DagRun - summary: Bulk Clear Dag Runs - description: 'Clear multiple Dag Runs in a single request. - - - When the URL ``dag_id`` is ``~``, every entry in ``dag_runs`` must - - carry its own ``dag_id``. When the URL ``dag_id`` is a specific Dag, - - entries may omit ``dag_id`` and inherit from the URL. ``dry_run`` - - returns the union of affected task instances across the listed runs - - without mutating state; the real clear returns the post-clear Dag Runs.' - operationId: bulk_clear_dag_runs - security: - - OAuth2PasswordBearer: [] - - HTTPBearer: [] - parameters: - - name: dag_id - in: path - required: true - schema: - type: string - title: Dag Id - requestBody: - required: true - content: - application/json: - schema: - $ref: '#/components/schemas/BulkDAGRunClearBody' - responses: - '200': - description: Successful Response - content: - application/json: - schema: - anyOf: - - $ref: '#/components/schemas/ClearTaskInstanceCollectionResponse' - - $ref: '#/components/schemas/DAGRunCollectionResponse' - title: Response Bulk Clear Dag Runs - '400': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Bad Request - '401': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Unauthorized - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Forbidden - '404': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Not Found - '422': - description: Validation Error - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear: post: tags: @@ -3054,6 +2980,69 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/clearDagRuns: + post: + tags: + - DagRun + summary: Clear Dag Runs + description: Clear multiple Dag Runs in a single request. + operationId: clear_dag_runs + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/BulkDAGRunClearBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + anyOf: + - $ref: '#/components/schemas/ClearTaskInstanceCollectionResponse' + - $ref: '#/components/schemas/DAGRunCollectionResponse' + title: Response Clear Dag Runs + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/dagSources/{dag_id}: get: tags: diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 58cf4eca1f95b..fb0ab311c5adf 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -1035,9 +1035,9 @@ export type ConnectionServicePostConnectionMutationResult = Awaited>; export type ConnectionServiceCreateDefaultConnectionsMutationResult = Awaited>; export type DagRunServiceTriggerDagRunMutationResult = Awaited>; -export type DagRunServiceBulkClearDagRunsMutationResult = Awaited>; export type DagRunServiceClearDagRunMutationResult = Awaited>; export type DagRunServiceGetListDagRunsBatchMutationResult = Awaited>; +export type DagRunServiceClearDagRunsMutationResult = Awaited>; export type DagServiceFavoriteDagMutationResult = Awaited>; export type DagServiceUnfavoriteDagMutationResult = Awaited>; export type TaskInstanceServiceGetTaskInstancesBatchMutationResult = Awaited>; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 496f69f4cdb44..41118b64c7400 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -2176,28 +2176,6 @@ export const useDagRunServiceTriggerDagRun = ({ mutationFn: ({ dagId, requestBody }) => DagRunService.triggerDagRun({ dagId, requestBody }) as unknown as Promise, ...options }); /** -* Bulk Clear Dag Runs -* Clear multiple Dag Runs in a single request. -* -* When the URL ``dag_id`` is ``~``, every entry in ``dag_runs`` must -* carry its own ``dag_id``. When the URL ``dag_id`` is a specific Dag, -* entries may omit ``dag_id`` and inherit from the URL. ``dry_run`` -* returns the union of affected task instances across the listed runs -* without mutating state; the real clear returns the post-clear Dag Runs. -* @param data The data for the request. -* @param data.dagId -* @param data.requestBody -* @returns unknown Successful Response -* @throws ApiError -*/ -export const useDagRunServiceBulkClearDagRuns = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.bulkClearDagRuns({ dagId, requestBody }) as unknown as Promise, ...options }); -/** * Clear Dag Run * @param data The data for the request. * @param data.dagId @@ -2232,6 +2210,22 @@ export const useDagRunServiceGetListDagRunsBatch = ({ mutationFn: ({ dagId, requestBody }) => DagRunService.getListDagRunsBatch({ dagId, requestBody }) as unknown as Promise, ...options }); /** +* Clear Dag Runs +* Clear multiple Dag Runs in a single request. +* @param data The data for the request. +* @param data.dagId +* @param data.requestBody +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useDagRunServiceClearDagRuns = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.clearDagRuns({ dagId, requestBody }) as unknown as Promise, ...options }); +/** * Favorite Dag * Mark the Dag as favorite. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 3731b3c3df962..30dc1ad095cdf 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, BulkClearDagRunsData, BulkClearDagRunsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStatesData, ListAssetStatesResponse, ClearAssetStateData, ClearAssetStateResponse, GetAssetStateData, GetAssetStateResponse, SetAssetStateData, SetAssetStateResponse, DeleteAssetStateData, DeleteAssetStateResponse, ListTaskStatesData, ListTaskStatesResponse, ClearTaskStateData, ClearTaskStateResponse, GetTaskStateData, GetTaskStateResponse, SetTaskStateData, SetTaskStateResponse, PatchTaskStateData, PatchTaskStateResponse, DeleteTaskStateData, DeleteTaskStateResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, ClearDagRunsData, ClearDagRunsResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStatesData, ListAssetStatesResponse, ClearAssetStateData, ClearAssetStateResponse, GetAssetStateData, GetAssetStateResponse, SetAssetStateData, SetAssetStateResponse, DeleteAssetStateData, DeleteAssetStateResponse, ListTaskStatesData, ListTaskStatesResponse, ClearTaskStateData, ClearTaskStateResponse, GetTaskStateData, GetTaskStateResponse, SetTaskStateData, SetTaskStateResponse, PatchTaskStateData, PatchTaskStateResponse, DeleteTaskStateData, DeleteTaskStateResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; export class AssetService { /** @@ -1146,40 +1146,6 @@ export class DagRunService { }); } - /** - * Bulk Clear Dag Runs - * Clear multiple Dag Runs in a single request. - * - * When the URL ``dag_id`` is ``~``, every entry in ``dag_runs`` must - * carry its own ``dag_id``. When the URL ``dag_id`` is a specific Dag, - * entries may omit ``dag_id`` and inherit from the URL. ``dry_run`` - * returns the union of affected task instances across the listed runs - * without mutating state; the real clear returns the post-clear Dag Runs. - * @param data The data for the request. - * @param data.dagId - * @param data.requestBody - * @returns unknown Successful Response - * @throws ApiError - */ - public static bulkClearDagRuns(data: BulkClearDagRunsData): CancelablePromise { - return __request(OpenAPI, { - method: 'POST', - url: '/api/v2/dags/{dag_id}/clearDagRuns', - path: { - dag_id: data.dagId - }, - body: data.requestBody, - mediaType: 'application/json', - errors: { - 400: 'Bad Request', - 401: 'Unauthorized', - 403: 'Forbidden', - 404: 'Not Found', - 422: 'Validation Error' - } - }); - } - /** * Clear Dag Run * @param data The data for the request. @@ -1267,6 +1233,34 @@ export class DagRunService { }); } + /** + * Clear Dag Runs + * Clear multiple Dag Runs in a single request. + * @param data The data for the request. + * @param data.dagId + * @param data.requestBody + * @returns unknown Successful Response + * @throws ApiError + */ + public static clearDagRuns(data: ClearDagRunsData): CancelablePromise { + return __request(OpenAPI, { + method: 'POST', + url: '/api/v2/dags/{dag_id}/clearDagRuns', + path: { + dag_id: data.dagId + }, + body: data.requestBody, + mediaType: 'application/json', + errors: { + 400: 'Bad Request', + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + /** * Get Dag Run Stats * Get duration statistics for a DAG based on its historical completed runs. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 9e6b0ebede751..4359dcbc9f2c6 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2942,13 +2942,6 @@ export type GetUpstreamAssetEventsData = { export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse; -export type BulkClearDagRunsData = { - dagId: string; - requestBody: BulkDAGRunClearBody; -}; - -export type BulkClearDagRunsResponse = ClearTaskInstanceCollectionResponse | DAGRunCollectionResponse; - export type ClearDagRunData = { dagId: string; dagRunId: string; @@ -2979,6 +2972,13 @@ export type GetListDagRunsBatchData = { export type GetListDagRunsBatchResponse = DAGRunCollectionResponse; +export type ClearDagRunsData = { + dagId: string; + requestBody: BulkDAGRunClearBody; +}; + +export type ClearDagRunsResponse = ClearTaskInstanceCollectionResponse | DAGRunCollectionResponse; + export type GetDagRunStatsData = { dagId: string; dagRunId: string; @@ -5420,18 +5420,14 @@ export type $OpenApiTs = { }; }; }; - '/api/v2/dags/{dag_id}/clearDagRuns': { + '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': { post: { - req: BulkClearDagRunsData; + req: ClearDagRunData; res: { /** * Successful Response */ - 200: ClearTaskInstanceCollectionResponse | DAGRunCollectionResponse; - /** - * Bad Request - */ - 400: HTTPExceptionResponse; + 200: ClearTaskInstanceCollectionResponse | DAGRunResponse; /** * Unauthorized */ @@ -5451,14 +5447,14 @@ export type $OpenApiTs = { }; }; }; - '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': { - post: { - req: ClearDagRunData; + '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait': { + get: { + req: WaitDagRunUntilFinishedData; res: { /** * Successful Response */ - 200: ClearTaskInstanceCollectionResponse | DAGRunResponse; + 200: unknown; /** * Unauthorized */ @@ -5478,14 +5474,14 @@ export type $OpenApiTs = { }; }; }; - '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait': { - get: { - req: WaitDagRunUntilFinishedData; + '/api/v2/dags/{dag_id}/dagRuns/list': { + post: { + req: GetListDagRunsBatchData; res: { /** * Successful Response */ - 200: unknown; + 200: DAGRunCollectionResponse; /** * Unauthorized */ @@ -5505,14 +5501,18 @@ export type $OpenApiTs = { }; }; }; - '/api/v2/dags/{dag_id}/dagRuns/list': { + '/api/v2/dags/{dag_id}/clearDagRuns': { post: { - req: GetListDagRunsBatchData; + req: ClearDagRunsData; res: { /** * Successful Response */ - 200: DAGRunCollectionResponse; + 200: ClearTaskInstanceCollectionResponse | DAGRunCollectionResponse; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; /** * Unauthorized */ From b0a9e322f603a5b9ae6f2dad271f84f710bc5c43 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Mon, 1 Jun 2026 15:08:57 +0200 Subject: [PATCH 4/7] Factor out shared bulk dag-run authorization request builder --- .../airflow/api_fastapi/core_api/security.py | 67 ++++++++----------- 1 file changed, 28 insertions(+), 39 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py b/airflow-core/src/airflow/api_fastapi/core_api/security.py index d0e947b8f5f4d..b93ebb2cc209f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/security.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py @@ -730,46 +730,48 @@ def inner( return inner +def _build_dag_run_access_requests( + entity_methods: list[tuple[str, ResourceMethod]], +) -> list[IsAuthorizedDagRequest]: + """ + Build per-entity DagRun authorization requests for a batched access check. + + ``entity_methods`` is a list of ``(dag_id, method)`` pairs with unresolvable + entries (no dag_id or the ``~`` wildcard) already filtered out by the caller. + The team for each dag is resolved once and shared across that dag's requests. + """ + dag_id_to_team = {dag_id: DagModel.get_team_name(dag_id) for dag_id, _ in entity_methods} + return [ + { + "method": method, + "access_entity": DagAccessEntity.RUN, + "details": DagDetails(id=dag_id, team_name=dag_id_to_team.get(dag_id)), + } + for dag_id, method in entity_methods + ] + + def requires_access_dag_run_bulk() -> Callable[[BulkBody[BulkDAGRunBody], BaseUser, str], None]: def inner( request: BulkBody[BulkDAGRunBody], user: GetUserDep, dag_id: str, ) -> None: - resolved_dag_ids: set[str] = set() - for action in request.actions: - for entity in action.entities: - if isinstance(entity, str): - entity_dag_id: str | None = dag_id - else: - entity_dag_id = entity.dag_id or dag_id - if entity_dag_id and entity_dag_id != "~": - resolved_dag_ids.add(entity_dag_id) - - dag_id_to_team = {d: DagModel.get_team_name(d) for d in resolved_dag_ids} - - requests: list[IsAuthorizedDagRequest] = [] + entity_methods: list[tuple[str, ResourceMethod]] = [] for action in request.actions: methods = _get_resource_methods_from_bulk_request(action) for entity in action.entities: if isinstance(entity, str): - entity_dag_id = dag_id + entity_dag_id: str | None = dag_id else: entity_dag_id = entity.dag_id or dag_id # Entities that can't be resolved are surfaced as 400 in the service's BulkResponse. if not entity_dag_id or entity_dag_id == "~": continue for method in methods: - requests.append( - { - "method": method, - "access_entity": DagAccessEntity.RUN, - "details": DagDetails( - id=entity_dag_id, team_name=dag_id_to_team.get(entity_dag_id) - ), - } - ) + entity_methods.append((entity_dag_id, method)) + requests = _build_dag_run_access_requests(entity_methods) _requires_access( is_authorized_callback=lambda: get_auth_manager().batch_is_authorized_dag( requests=requests, @@ -786,27 +788,14 @@ def inner( user: GetUserDep, dag_id: str, ) -> None: - resolved_dag_ids: set[str] = set() - for run in body.dag_runs: - entity_dag_id = run.dag_id or dag_id - if entity_dag_id and entity_dag_id != "~": - resolved_dag_ids.add(entity_dag_id) - - dag_id_to_team = {d: DagModel.get_team_name(d) for d in resolved_dag_ids} - - requests: list[IsAuthorizedDagRequest] = [] + entity_methods: list[tuple[str, ResourceMethod]] = [] for run in body.dag_runs: entity_dag_id = run.dag_id or dag_id if not entity_dag_id or entity_dag_id == "~": continue - requests.append( - { - "method": "PUT", - "access_entity": DagAccessEntity.RUN, - "details": DagDetails(id=entity_dag_id, team_name=dag_id_to_team.get(entity_dag_id)), - } - ) + entity_methods.append((entity_dag_id, "PUT")) + requests = _build_dag_run_access_requests(entity_methods) _requires_access( is_authorized_callback=lambda: get_auth_manager().batch_is_authorized_dag( requests=requests, From 1620badd0cd81276ba9c0bf274ac70c59e879357 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Mon, 1 Jun 2026 15:34:04 +0200 Subject: [PATCH 5/7] Restore per-dag team-name dedup in bulk dag-run authorization --- airflow-core/src/airflow/api_fastapi/core_api/security.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py b/airflow-core/src/airflow/api_fastapi/core_api/security.py index b93ebb2cc209f..154929af064fa 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/security.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py @@ -740,7 +740,8 @@ def _build_dag_run_access_requests( entries (no dag_id or the ``~`` wildcard) already filtered out by the caller. The team for each dag is resolved once and shared across that dag's requests. """ - dag_id_to_team = {dag_id: DagModel.get_team_name(dag_id) for dag_id, _ in entity_methods} + resolved_dag_ids = {dag_id for dag_id, _ in entity_methods} + dag_id_to_team = {dag_id: DagModel.get_team_name(dag_id) for dag_id in resolved_dag_ids} return [ { "method": method, From 6d300cc2c9fb5b5bdbe4a6e4197e3094a422332b Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Mon, 1 Jun 2026 15:39:27 +0200 Subject: [PATCH 6/7] Patch resolver in service module for clear-endpoint resolver test --- .../unit/api_fastapi/core_api/routes/public/test_dag_run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 9bbb5d066d357..448fc65c4d19b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -2728,12 +2728,12 @@ def test_dag_level_false_overrides_fallback_true(self, dag_maker, session): def test_clear_endpoint_invokes_resolver_when_field_omitted(self, test_client): """Clearing without run_on_latest_version triggers the server-side resolver.""" with mock.patch( - "airflow.api_fastapi.core_api.routes.public.dag_run.resolve_run_on_latest_version", + "airflow.api_fastapi.core_api.services.public.dag_run.resolve_run_on_latest_version", return_value=False, ) as mock_resolver: response = test_client.post( f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", - json={"dry_run": True}, + json={"dry_run": False}, ) assert response.status_code == 200 mock_resolver.assert_called_once() From 684edf350aaa25e0088ebd8b7caacb235415fe75 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Mon, 1 Jun 2026 15:56:41 +0200 Subject: [PATCH 7/7] Test bulk clearDagRuns rejects unauthorized dag_ids in request body --- .../core_api/routes/public/test_dag_run.py | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 448fc65c4d19b..f570e5ca3a79c 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -2041,6 +2041,58 @@ def test_bulk_clear_unauthorized_returns_403(self, unauthorized_test_client): ) assert response.status_code == 403 + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_rejects_unauthorized_dag_ids_from_request_body(self, test_client, session): + """A 403 at the route level if any entry references a Dag the user can't access; nothing is cleared.""" + restricted_bundle_name = "restricted-bundle-clear" + restricted_team_name = "restricted-team-clear" + restricted_bundle = DagBundleModel(name=restricted_bundle_name) + restricted_team = Team(name=restricted_team_name) + restricted_bundle.teams.append(restricted_team) + session.add_all([restricted_bundle, restricted_team]) + session.flush() + # Restrict DAG2 by attaching it to a team-scoped bundle the limited user has no access to. + session.execute( + update(DagModel).where(DagModel.dag_id == DAG2_ID).values(bundle_name=restricted_bundle_name) + ) + session.commit() + + states_before = { + run_id: session.scalar(select(DagRun.state).where(DagRun.run_id == run_id)) + for run_id in (DAG1_RUN1_ID, DAG2_RUN1_ID) + } + + auth_manager = test_client.app.state.auth_manager + token = auth_manager._get_token_signer().generate( + auth_manager.serialize_user( + SimpleAuthManagerUser(username="limited-user", role="user", teams=[]), + ) + ) + with ( + mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked", return_value=False), + TestClient( + test_client.app, + headers={"Authorization": f"Bearer {token}"}, + base_url=str(test_client.base_url), + ) as limited_test_client, + ): + response = limited_test_client.post( + "/dags/~/clearDagRuns", + json={ + "dry_run": False, + "dag_runs": [ + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID}, + {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID}, + ], + }, + ) + + assert response.status_code == 403 + # The batched auth check rejects the whole request, so the authorized Dag's run is not cleared either. + session.expire_all() + for run_id, state_before in states_before.items(): + assert session.scalar(select(DagRun.state).where(DagRun.run_id == run_id)) == state_before + class TestClearDagRunOnlyNew: """Integration tests for only_new=True using a real two-version DAG.