diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 481cc0387fed1..9ae0762e1e132 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -73,6 +73,42 @@ def validate_model(cls, data: Any) -> Any: return data +class DagRunIdentifier(StrictBaseModel): + """Identifier for a Dag run targeted by a bulk operation.""" + + dag_run_id: str + dag_id: str | None = None + + +class BulkDagRunBody(DagRunIdentifier, DAGRunPatchBody): + """Request body for bulk update and delete Dag runs (patch fields plus run identity).""" + + +class BulkClearDagRunsBody(StrictBaseModel): + """Request body for the bulk clear Dag runs endpoint.""" + + runs: list[DagRunIdentifier] + only_failed: bool = False + only_new: bool = Field( + default=False, + description="Only queue newly added tasks in the latest Dag version without clearing existing tasks.", + ) + run_on_latest_version: bool = Field( + default=False, + description="(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run.", + ) + dry_run: bool = True + note: str | None = Field(default=None, max_length=1000) + + @model_validator(mode="before") + @classmethod + def validate_model(cls, data: Any) -> Any: + """Validate clear Dag runs form.""" + if data.get("only_new") and data.get("only_failed"): + raise ValueError("only_new and only_failed are mutually exclusive") + return data + + class DAGRunResponse(BaseModel): """Dag Run serializer for responses.""" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5ed96855c24ac..ecea019ab8ecd 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2107,12 +2107,25 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear: - post: + /api/v2/dags/{dag_id}/dagRuns: + patch: tags: - DagRun - summary: Clear Dag Run - operationId: clear_dag_run + summary: Bulk Dag Runs + description: 'Bulk update and delete Dag runs. + + + A single request handles many Dag runs in one transaction. Per-entity + + failures are reported via ``BulkResponse`` so that a partial failure does + + not abort the whole batch. + + + The path''s ``dag_id`` may be ``~`` for cross-DAG operations; in that case + + each entity must specify its own ``dag_id`` in the body.' + operationId: bulk_dag_runs security: - OAuth2PasswordBearer: [] - HTTPBearer: [] @@ -2123,28 +2136,19 @@ paths: schema: type: string title: Dag Id - - name: dag_run_id - in: path - required: true - schema: - type: string - title: Dag Run Id requestBody: required: true content: application/json: schema: - $ref: '#/components/schemas/DAGRunClearBody' + $ref: '#/components/schemas/BulkBody_BulkDagRunBody_' responses: '200': description: Successful Response content: application/json: schema: - anyOf: - - $ref: '#/components/schemas/ClearTaskInstanceCollectionResponse' - - $ref: '#/components/schemas/DAGRunResponse' - title: Response Clear Dag Run + $ref: '#/components/schemas/BulkResponse' '401': content: application/json: @@ -2157,19 +2161,12 @@ paths: schema: $ref: '#/components/schemas/HTTPExceptionResponse' description: Forbidden - '404': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Not Found '422': description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /api/v2/dags/{dag_id}/dagRuns: get: tags: - DagRun @@ -2792,6 +2789,134 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/dagRuns/clear: + post: + tags: + - DagRun + summary: Post Clear Dag Runs + description: 'Clear multiple Dag runs in a single request. + + + Mirrors the per-DAG bulk pattern of ``POST /dags/{dag_id}/clearTaskInstances``: + + each ``(dag_id, dag_run_id)`` in ``runs`` is processed in the same transaction + + and per-entry failures are reported via ``BulkActionResponse.errors``. + + + The path''s ``dag_id`` may be ``~`` for cross-DAG clears; otherwise each entry + + must reference the same ``dag_id`` as the path.' + operationId: post_clear_dag_runs + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/BulkClearDagRunsBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/BulkActionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear: + post: + tags: + - DagRun + summary: Clear Dag Run + operationId: clear_dag_run + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/DAGRunClearBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + anyOf: + - $ref: '#/components/schemas/ClearTaskInstanceCollectionResponse' + - $ref: '#/components/schemas/DAGRunResponse' + title: Response Clear Dag Run + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait: get: tags: @@ -10775,6 +10900,21 @@ components: This structure helps users understand which key actions succeeded and which failed.' + BulkBody_BulkDagRunBody_: + properties: + actions: + items: + oneOf: + - $ref: '#/components/schemas/BulkCreateAction_BulkDagRunBody_' + - $ref: '#/components/schemas/BulkUpdateAction_BulkDagRunBody_' + - $ref: '#/components/schemas/BulkDeleteAction_BulkDagRunBody_' + type: array + title: Actions + additionalProperties: false + type: object + required: + - actions + title: BulkBody[BulkDagRunBody] BulkBody_BulkTaskInstanceBody_: properties: actions: @@ -10835,6 +10975,67 @@ components: required: - actions title: BulkBody[VariableBody] + BulkClearDagRunsBody: + properties: + runs: + items: + $ref: '#/components/schemas/DagRunIdentifier' + type: array + title: Runs + only_failed: + type: boolean + title: Only Failed + default: false + only_new: + type: boolean + title: Only New + description: Only queue newly added tasks in the latest Dag version without + clearing existing tasks. + default: false + run_on_latest_version: + type: boolean + title: Run On Latest Version + description: (Experimental) Run on the latest bundle version of the Dag + after clearing the Dag Run. + default: false + dry_run: + type: boolean + title: Dry Run + default: true + note: + anyOf: + - type: string + maxLength: 1000 + - type: 'null' + title: Note + additionalProperties: false + type: object + required: + - runs + title: BulkClearDagRunsBody + description: Request body for the bulk clear Dag runs endpoint. + BulkCreateAction_BulkDagRunBody_: + properties: + action: + type: string + const: create + title: Action + description: The action to be performed on the entities. + entities: + items: + $ref: '#/components/schemas/BulkDagRunBody' + type: array + title: Entities + description: A list of entities to be created. + action_on_existence: + $ref: '#/components/schemas/BulkActionOnExistence' + default: fail + additionalProperties: false + type: object + required: + - action + - entities + title: BulkCreateAction[BulkDagRunBody] BulkCreateAction_BulkTaskInstanceBody_: properties: action: @@ -10923,6 +11124,57 @@ components: - action - entities title: BulkCreateAction[VariableBody] + BulkDagRunBody: + properties: + state: + anyOf: + - $ref: '#/components/schemas/DAGRunPatchStates' + - type: 'null' + note: + anyOf: + - type: string + maxLength: 1000 + - type: 'null' + title: Note + dag_run_id: + type: string + title: Dag Run Id + dag_id: + anyOf: + - type: string + - type: 'null' + title: Dag Id + additionalProperties: false + type: object + required: + - dag_run_id + title: BulkDagRunBody + description: Request body for bulk update and delete Dag runs (patch fields + plus run identity). + BulkDeleteAction_BulkDagRunBody_: + properties: + action: + type: string + const: delete + title: Action + description: The action to be performed on the entities. + entities: + items: + anyOf: + - type: string + - $ref: '#/components/schemas/BulkDagRunBody' + type: array + title: Entities + description: A list of entity id/key or entity objects to be deleted. + action_on_non_existence: + $ref: '#/components/schemas/BulkActionNotOnExistence' + default: fail + additionalProperties: false + type: object + required: + - action + - entities + title: BulkDeleteAction[BulkDagRunBody] BulkDeleteAction_BulkTaskInstanceBody_: properties: action: @@ -11104,6 +11356,38 @@ components: - task_id title: BulkTaskInstanceBody description: Request body for bulk update, and delete task instances. + BulkUpdateAction_BulkDagRunBody_: + properties: + action: + type: string + const: update + title: Action + description: The action to be performed on the entities. + entities: + items: + $ref: '#/components/schemas/BulkDagRunBody' + type: array + title: Entities + description: A list of entities to be updated. + update_mask: + anyOf: + - items: + type: string + type: array + - type: 'null' + title: Update Mask + description: A list of field names to update for each entity.Only these + fields will be applied from the request body to the database model.Any + extra fields provided will be ignored. + action_on_non_existence: + $ref: '#/components/schemas/BulkActionNotOnExistence' + default: fail + additionalProperties: false + type: object + required: + - action + - entities + title: BulkUpdateAction[BulkDagRunBody] BulkUpdateAction_BulkTaskInstanceBody_: properties: action: @@ -12604,6 +12888,22 @@ components: - partition_key title: DagRunAssetReference description: DagRun serializer for asset responses. + DagRunIdentifier: + properties: + dag_run_id: + type: string + title: Dag Run Id + dag_id: + anyOf: + - type: string + - type: 'null' + title: Dag Id + additionalProperties: false + type: object + required: + - dag_run_id + title: DagRunIdentifier + description: Identifier for a Dag run targeted by a bulk operation. DagRunState: type: string enum: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 0e3670409b48f..4b3d9b267358b 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -76,7 +76,14 @@ from airflow.api_fastapi.common.types import Mimetype from airflow.api_fastapi.core_api.base import OrmClause from airflow.api_fastapi.core_api.datamodels.assets import AssetEventCollectionResponse +from airflow.api_fastapi.core_api.datamodels.common import ( + BulkActionResponse, + BulkBody, + BulkResponse, +) from airflow.api_fastapi.core_api.datamodels.dag_run import ( + BulkClearDagRunsBody, + BulkDagRunBody, DAGRunClearBody, DAGRunCollectionResponse, DAGRunPatchBody, @@ -97,7 +104,11 @@ requires_access_asset, requires_access_dag, ) -from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter +from airflow.api_fastapi.core_api.services.public.dag_run import ( + BulkDagRunService, + DagRunWaiter, + bulk_clear_dag_runs, +) from airflow.api_fastapi.logging.decorators import action_logging from airflow.exceptions import ParamValidationError from airflow.listeners.listener import get_listener_manager @@ -288,6 +299,69 @@ def get_upstream_asset_events( ) +@dag_run_router.patch( + "", + dependencies=[ + Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), + Depends(action_logging()), + ], +) +def bulk_dag_runs( + request: BulkBody[BulkDagRunBody], + session: SessionDep, + dag_id: str, + dag_bag: DagBagDep, + user: GetUserDep, +) -> BulkResponse: + """ + Bulk update and delete Dag runs. + + A single request handles many Dag runs in one transaction. Per-entity + failures are reported via ``BulkResponse`` so that a partial failure does + not abort the whole batch. + + The path's ``dag_id`` may be ``~`` for cross-DAG operations; in that case + each entity must specify its own ``dag_id`` in the body. + """ + return BulkDagRunService( + session=session, request=request, dag_id=dag_id, dag_bag=dag_bag, user=user + ).handle_request() + + +@dag_run_router.post( + "/clear", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[ + Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), + Depends(action_logging()), + ], +) +def post_clear_dag_runs( + dag_id: str, + body: BulkClearDagRunsBody, + dag_bag: DagBagDep, + session: SessionDep, + user: GetUserDep, +) -> BulkActionResponse: + """ + Clear multiple Dag runs in a single request. + + Mirrors the per-DAG bulk pattern of ``POST /dags/{dag_id}/clearTaskInstances``: + each ``(dag_id, dag_run_id)`` in ``runs`` is processed in the same transaction + and per-entry failures are reported via ``BulkActionResponse.errors``. + + The path's ``dag_id`` may be ``~`` for cross-DAG clears; otherwise each entry + must reference the same ``dag_id`` as the path. + """ + return bulk_clear_dag_runs( + body=body, + dag_id=dag_id, + dag_bag=dag_bag, + session=session, + user=user, + ) + + @dag_run_router.post( "/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py index e7d7cb98c939f..11209edab86f4 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py @@ -20,12 +20,41 @@ import asyncio import itertools import json +import logging import operator -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal import attrs -from sqlalchemy import select +from fastapi import HTTPException, status +from sqlalchemy import select, tuple_ +from sqlalchemy.orm import joinedload +from sqlalchemy.orm.session import Session +from airflow.api.common.mark_tasks import ( + set_dag_run_state_to_failed, + set_dag_run_state_to_queued, + set_dag_run_state_to_success, +) +from airflow.api_fastapi.app import get_auth_manager +from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity, DagDetails +from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run +from airflow.api_fastapi.core_api.datamodels.common import ( + BulkActionNotOnExistence, + BulkActionResponse, + BulkBody, + BulkCreateAction, + BulkDeleteAction, + BulkUpdateAction, +) +from airflow.api_fastapi.core_api.datamodels.dag_run import ( + BulkClearDagRunsBody, + BulkDagRunBody, + DAGRunPatchStates, +) +from airflow.api_fastapi.core_api.security import GetUserDep +from airflow.api_fastapi.core_api.services.public.common import BulkService +from airflow.listeners.listener import get_listener_manager +from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun from airflow.models.xcom import XCOM_RETURN_KEY, XComModel from airflow.utils.session import create_session_async @@ -34,6 +63,14 @@ if TYPE_CHECKING: from collections.abc import AsyncGenerator, Iterator + from airflow.serialization.definitions.dag import SerializedDAG + + +AuthMethod = Literal["GET", "PUT", "POST", "DELETE"] + + +log = logging.getLogger(__name__) + @attrs.define class DagRunWaiter: @@ -86,3 +123,450 @@ async def wait(self) -> AsyncGenerator[str, None]: await asyncio.sleep(self.interval) yield await self._serialize_response(dag_run := await self._get_dag_run()) yield "\n" + + +def _format_dag_run_key(dag_id: str, dag_run_id: str) -> str: + return f"{dag_id}.{dag_run_id}" + + +def _authorize_dag_run( + *, + session: Session, + user, + dag_id: str, + method: AuthMethod, + cache: dict[str, bool], +) -> bool: + """ + Return whether ``user`` may perform ``method`` on Dag runs of ``dag_id``. + + The result is memoised in ``cache`` so a bulk request that touches many + runs of the same Dag only pays for one ``is_authorized_dag`` call per Dag. + """ + if dag_id not in cache: + team_name = DagModel.get_team_name(dag_id, session=session) + cache[dag_id] = get_auth_manager().is_authorized_dag( + method=method, + access_entity=DagAccessEntity.RUN, + details=DagDetails(id=dag_id, team_name=team_name), + user=user, + ) + return cache[dag_id] + + +def _cached_dag_for_run( + dag_bag: DagBagDep, + dag_run: DagRun, + session: Session, + cache: dict[str, SerializedDAG], +) -> SerializedDAG: + """Return the SerializedDAG for ``dag_run``, memoising lookups by ``dag_id``.""" + dag_id = dag_run.dag_id + if dag_id not in cache: + cache[dag_id] = get_dag_for_run(dag_bag, dag_run, session=session) + return cache[dag_id] + + +def _apply_state_change( + dag_run: DagRun, + new_state: DAGRunPatchStates, + dag: SerializedDAG, + session: Session, +) -> None: + """Apply ``new_state`` to ``dag_run`` and fire the matching listener hook.""" + if new_state == DAGRunPatchStates.SUCCESS: + set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + try: + get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="") + except Exception: + log.exception("error calling listener") + elif new_state == DAGRunPatchStates.QUEUED: + # Notification on queued is intentionally skipped; the scheduler emits + # the RUNNING notification instead. + set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + elif new_state == DAGRunPatchStates.FAILED: + set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + try: + get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="") + except Exception: + log.exception("error calling listener") + + +def _apply_note(dag_run: DagRun, note: str | None, user_id: str) -> None: + if dag_run.dag_run_note is None: + dag_run.note = (note, user_id) + else: + dag_run.dag_run_note.content = note + dag_run.dag_run_note.user_id = user_id + + +def _validate_no_wildcard_in_resolved( + *, + dag_id: str, + dag_run_id: str, + results: BulkActionResponse, +) -> bool: + if dag_id == "~" or dag_run_id == "~": + results.errors.append( + { + "error": ( + "When the path uses the ``~`` wildcard, ``dag_id`` and ``dag_run_id`` must be " + "specified in the body for each entity." + ), + "status_code": status.HTTP_400_BAD_REQUEST, + } + ) + return False + return True + + +def _validate_path_dag_id_match( + *, + path_dag_id: str, + entity_dag_id: str | None, + dag_run_id: str, + results: BulkActionResponse, +) -> bool: + if path_dag_id != "~" and entity_dag_id is not None and entity_dag_id != path_dag_id: + results.errors.append( + { + "error": ( + f"Entity dag_id '{entity_dag_id}' does not match path dag_id '{path_dag_id}'. " + "Use ``~`` in the path for cross-DAG bulk operations." + ), + "status_code": status.HTTP_400_BAD_REQUEST, + "dag_id": entity_dag_id, + "dag_run_id": dag_run_id, + } + ) + return False + return True + + +def _fetch_dag_runs( + session: Session, + keys: set[tuple[str, str]], +) -> tuple[dict[tuple[str, str], DagRun], set[tuple[str, str]]]: + """Batch-fetch Dag runs by ``(dag_id, dag_run_id)`` pairs in a single query.""" + if not keys: + return {}, set() + dag_runs = session.scalars( + select(DagRun) + .options(joinedload(DagRun.dag_model)) + .where(tuple_(DagRun.dag_id, DagRun.run_id).in_(keys)) + ).all() + found = {(dr.dag_id, dr.run_id): dr for dr in dag_runs} + not_found = keys - set(found.keys()) + return found, not_found + + +class BulkDagRunService(BulkService[BulkDagRunBody]): + """Service for handling bulk operations on Dag runs.""" + + def __init__( + self, + session: Session, + request: BulkBody[BulkDagRunBody], + dag_id: str, + dag_bag: DagBagDep, + user: GetUserDep, + ): + super().__init__(session, request) + self.dag_id = dag_id + self.dag_bag = dag_bag + self.user = user + + def _resolve_identifiers(self, entity: str | BulkDagRunBody) -> tuple[str, str]: + """Return ``(dag_id, dag_run_id)`` for an entity, falling back to the path's ``dag_id``.""" + if isinstance(entity, str): + return self.dag_id, entity + dag_id = entity.dag_id or self.dag_id + return dag_id, entity.dag_run_id + + def _check_dag_authorization( + self, + dag_id: str, + method: AuthMethod, + action_name: str, + results: BulkActionResponse, + cache: dict[str, bool], + ) -> bool: + if not _authorize_dag_run( + session=self.session, + user=self.user, + dag_id=dag_id, + method=method, + cache=cache, + ): + results.errors.append( + { + "error": f"User is not authorized to {action_name} Dag runs for DAG '{dag_id}'", + "status_code": status.HTTP_403_FORBIDDEN, + } + ) + return False + return True + + def handle_bulk_create( + self, action: BulkCreateAction[BulkDagRunBody], results: BulkActionResponse + ) -> None: + results.errors.append( + { + "error": "Dag runs bulk create is not supported via this endpoint; use the trigger Dag run endpoint instead.", + "status_code": status.HTTP_405_METHOD_NOT_ALLOWED, + } + ) + + def handle_bulk_update( + self, action: BulkUpdateAction[BulkDagRunBody], results: BulkActionResponse + ) -> None: + """Bulk update Dag runs (state and/or note).""" + update_mask = action.update_mask + auth_cache: dict[str, bool] = {} + dag_cache: dict[str, SerializedDAG] = {} + keys: set[tuple[str, str]] = set() + entity_map: dict[tuple[str, str], BulkDagRunBody] = {} + + for entity in action.entities: + if isinstance(entity, str): + results.errors.append( + { + "error": "Bulk update requires entities as objects, not strings.", + "status_code": status.HTTP_400_BAD_REQUEST, + } + ) + continue + dag_id, dag_run_id = self._resolve_identifiers(entity) + if not _validate_no_wildcard_in_resolved(dag_id=dag_id, dag_run_id=dag_run_id, results=results): + continue + if not _validate_path_dag_id_match( + path_dag_id=self.dag_id, + entity_dag_id=entity.dag_id, + dag_run_id=dag_run_id, + results=results, + ): + continue + if not self._check_dag_authorization(dag_id, "PUT", action.action.value, results, auth_cache): + continue + keys.add((dag_id, dag_run_id)) + entity_map[(dag_id, dag_run_id)] = entity + + try: + found, not_found = _fetch_dag_runs(self.session, keys) + + if action.action_on_non_existence == BulkActionNotOnExistence.FAIL and not_found: + missing = [{"dag_id": d, "dag_run_id": r} for d, r in not_found] + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"The Dag runs with these identifiers were not found: {missing}", + ) + + for key, dag_run in found.items(): + entity = entity_map[key] + fields_to_update = entity.model_fields_set + if update_mask: + fields_to_update = fields_to_update.intersection(update_mask) + fields_to_update = fields_to_update - {"dag_id", "dag_run_id"} + if not fields_to_update: + continue + + try: + with self.session.begin_nested(): + dag = _cached_dag_for_run(self.dag_bag, dag_run, self.session, dag_cache) + if "state" in fields_to_update and entity.state is not None: + _apply_state_change(dag_run, entity.state, dag, self.session) + if "note" in fields_to_update: + refreshed = self.session.get(DagRun, dag_run.id) + if refreshed is not None: + _apply_note(refreshed, entity.note, self.user.get_id()) + except HTTPException as exc: + results.errors.append( + { + "error": str(exc.detail), + "status_code": exc.status_code, + "dag_id": key[0], + "dag_run_id": key[1], + } + ) + continue + except Exception as exc: + results.errors.append( + { + "error": str(exc), + "status_code": status.HTTP_500_INTERNAL_SERVER_ERROR, + "dag_id": key[0], + "dag_run_id": key[1], + } + ) + continue + + results.success.append(_format_dag_run_key(*key)) + except HTTPException as e: + results.errors.append({"error": f"{e.detail}", "status_code": e.status_code}) + + def handle_bulk_delete( + self, action: BulkDeleteAction[BulkDagRunBody], results: BulkActionResponse + ) -> None: + """Bulk delete Dag runs.""" + auth_cache: dict[str, bool] = {} + keys: set[tuple[str, str]] = set() + + for entity in action.entities: + dag_id, dag_run_id = self._resolve_identifiers(entity) + entity_dag_id = entity.dag_id if isinstance(entity, BulkDagRunBody) else None + if not _validate_no_wildcard_in_resolved(dag_id=dag_id, dag_run_id=dag_run_id, results=results): + continue + if not _validate_path_dag_id_match( + path_dag_id=self.dag_id, + entity_dag_id=entity_dag_id, + dag_run_id=dag_run_id, + results=results, + ): + continue + if not self._check_dag_authorization(dag_id, "DELETE", action.action.value, results, auth_cache): + continue + keys.add((dag_id, dag_run_id)) + + try: + found, not_found = _fetch_dag_runs(self.session, keys) + + if action.action_on_non_existence == BulkActionNotOnExistence.FAIL and not_found: + missing = [{"dag_id": d, "dag_run_id": r} for d, r in not_found] + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"The Dag runs with these identifiers were not found: {missing}", + ) + + deletable_states = {s.value for s in DAGRunPatchStates} + for key, dag_run in found.items(): + if dag_run.state not in deletable_states: + results.errors.append( + { + "error": ( + f"The DagRun with dag_id: `{dag_run.dag_id}` and run_id: `{dag_run.run_id}` " + f"cannot be deleted in {dag_run.state} state" + ), + "status_code": status.HTTP_409_CONFLICT, + "dag_id": dag_run.dag_id, + "dag_run_id": dag_run.run_id, + } + ) + continue + self.session.delete(dag_run) + results.success.append(_format_dag_run_key(*key)) + except HTTPException as e: + results.errors.append({"error": f"{e.detail}", "status_code": e.status_code}) + + +def bulk_clear_dag_runs( + body: BulkClearDagRunsBody, + dag_id: str, + dag_bag: DagBagDep, + session: Session, + user: GetUserDep, +) -> BulkActionResponse: + """ + Run ``dag.clear()`` for each ``(dag_id, dag_run_id)`` in ``body.runs`` within a single transaction. + + Returns ``BulkActionResponse`` with per-run success keys and per-run failure entries so that a partial + failure does not abort the entire batch. + """ + results = BulkActionResponse() + auth_cache: dict[str, bool] = {} + dag_cache: dict[str, SerializedDAG] = {} + keys_to_fetch: list[tuple[str, str]] = [] + + for identifier in body.runs: + run_dag_id = identifier.dag_id or dag_id + run_id = identifier.dag_run_id + + if not _validate_no_wildcard_in_resolved(dag_id=run_dag_id, dag_run_id=run_id, results=results): + continue + if not _validate_path_dag_id_match( + path_dag_id=dag_id, + entity_dag_id=identifier.dag_id, + dag_run_id=run_id, + results=results, + ): + continue + if not _authorize_dag_run( + session=session, user=user, dag_id=run_dag_id, method="PUT", cache=auth_cache + ): + results.errors.append( + { + "error": f"User is not authorized to clear Dag runs for DAG '{run_dag_id}'", + "status_code": status.HTTP_403_FORBIDDEN, + "dag_id": run_dag_id, + "dag_run_id": run_id, + } + ) + continue + + keys_to_fetch.append((run_dag_id, run_id)) + + found, _ = _fetch_dag_runs(session, set(keys_to_fetch)) + + for key in keys_to_fetch: + run_dag_id, run_id = key + dag_run = found.get(key) + if dag_run is None: + results.errors.append( + { + "error": f"Dag run not found for dag_id '{run_dag_id}', dag_run_id '{run_id}'", + "status_code": status.HTTP_404_NOT_FOUND, + "dag_id": run_dag_id, + "dag_run_id": run_id, + } + ) + continue + + try: + with session.begin_nested(): + dag = _cached_dag_for_run(dag_bag, dag_run, session, dag_cache) + if body.dry_run: + dag.clear( + run_id=run_id, + task_ids=None, + only_failed=body.only_failed, + only_new=body.only_new, + run_on_latest_version=body.run_on_latest_version, + dry_run=True, + session=session, + ) + else: + dag.clear( + run_id=run_id, + task_ids=None, + only_failed=body.only_failed, + only_new=body.only_new, + run_on_latest_version=body.run_on_latest_version, + session=session, + ) + if body.note is not None: + refreshed = session.get(DagRun, dag_run.id) + if refreshed is not None: + _apply_note(refreshed, body.note, user.get_id()) + except HTTPException as exc: + results.errors.append( + { + "error": str(exc.detail), + "status_code": exc.status_code, + "dag_id": run_dag_id, + "dag_run_id": run_id, + } + ) + continue + except Exception as exc: + results.errors.append( + { + "error": str(exc), + "status_code": status.HTTP_500_INTERNAL_SERVER_ERROR, + "dag_id": run_dag_id, + "dag_run_id": run_id, + } + ) + continue + + results.success.append(_format_dag_run_key(run_dag_id, run_id)) + + return results diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 0e44f9f63bd74..294e3ffefefae 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -994,8 +994,9 @@ export type BackfillServiceCreateBackfillDryRunMutationResult = Awaited>; export type ConnectionServiceTestConnectionMutationResult = Awaited>; export type ConnectionServiceCreateDefaultConnectionsMutationResult = Awaited>; -export type DagRunServiceClearDagRunMutationResult = Awaited>; export type DagRunServiceTriggerDagRunMutationResult = Awaited>; +export type DagRunServicePostClearDagRunsMutationResult = Awaited>; +export type DagRunServiceClearDagRunMutationResult = Awaited>; export type DagRunServiceGetListDagRunsBatchMutationResult = Awaited>; export type DagServiceFavoriteDagMutationResult = Awaited>; export type DagServiceUnfavoriteDagMutationResult = Awaited>; @@ -1012,6 +1013,7 @@ export type DagParsingServiceReparseDagFileMutationResult = Awaited>; export type ConnectionServiceBulkConnectionsMutationResult = Awaited>; export type DagRunServicePatchDagRunMutationResult = Awaited>; +export type DagRunServiceBulkDagRunsMutationResult = Awaited>; export type DagServicePatchDagsMutationResult = Awaited>; export type DagServicePatchDagMutationResult = Awaited>; export type TaskInstanceServicePatchTaskInstanceMutationResult = Awaited>; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index dc43dd038afb1..d86f7f09dd7bf 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -2,7 +2,7 @@ import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from "@tanstack/react-query"; import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; -import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; +import { BackfillPostBody, BulkBody_BulkDagRunBody_, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, BulkClearDagRunsBody, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; import * as Common from "./common"; /** * Get Assets @@ -2080,6 +2080,45 @@ export const useConnectionServiceTestConnection = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: () => ConnectionService.createDefaultConnections() as unknown as Promise, ...options }); /** +* Trigger Dag Run +* Trigger a Dag. +* @param data The data for the request. +* @param data.dagId +* @param data.requestBody +* @returns DAGRunResponse Successful Response +* @throws ApiError +*/ +export const useDagRunServiceTriggerDagRun = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.triggerDagRun({ dagId, requestBody }) as unknown as Promise, ...options }); +/** +* Post Clear Dag Runs +* Clear multiple Dag runs in a single request. +* +* Mirrors the per-DAG bulk pattern of ``POST /dags/{dag_id}/clearTaskInstances``: +* each ``(dag_id, dag_run_id)`` in ``runs`` is processed in the same transaction +* and per-entry failures are reported via ``BulkActionResponse.errors``. +* +* The path's ``dag_id`` may be ``~`` for cross-DAG clears; otherwise each entry +* must reference the same ``dag_id`` as the path. +* @param data The data for the request. +* @param data.dagId +* @param data.requestBody +* @returns BulkActionResponse Successful Response +* @throws ApiError +*/ +export const useDagRunServicePostClearDagRuns = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.postClearDagRuns({ dagId, requestBody }) as unknown as Promise, ...options }); +/** * Clear Dag Run * @param data The data for the request. * @param data.dagId @@ -2098,22 +2137,6 @@ export const useDagRunServiceClearDagRun = ({ mutationFn: ({ dagId, dagRunId, requestBody }) => DagRunService.clearDagRun({ dagId, dagRunId, requestBody }) as unknown as Promise, ...options }); /** -* Trigger Dag Run -* Trigger a Dag. -* @param data The data for the request. -* @param data.dagId -* @param data.requestBody -* @returns DAGRunResponse Successful Response -* @throws ApiError -*/ -export const useDagRunServiceTriggerDagRun = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.triggerDagRun({ dagId, requestBody }) as unknown as Promise, ...options }); -/** * Get List Dag Runs Batch * Get a list of Dag Runs. * @param data The data for the request. @@ -2355,6 +2378,29 @@ export const useDagRunServicePatchDagRun = ({ mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) => DagRunService.patchDagRun({ dagId, dagRunId, requestBody, updateMask }) as unknown as Promise, ...options }); /** +* Bulk Dag Runs +* Bulk update and delete Dag runs. +* +* A single request handles many Dag runs in one transaction. Per-entity +* failures are reported via ``BulkResponse`` so that a partial failure does +* not abort the whole batch. +* +* The path's ``dag_id`` may be ``~`` for cross-DAG operations; in that case +* each entity must specify its own ``dag_id`` in the body. +* @param data The data for the request. +* @param data.dagId +* @param data.requestBody +* @returns BulkResponse Successful Response +* @throws ApiError +*/ +export const useDagRunServiceBulkDagRuns = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.bulkDagRuns({ dagId, requestBody }) as unknown as Promise, ...options }); +/** * Patch Dags * Patch multiple Dags. * diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 99b1c49345548..ecd7db6a4cf5d 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -620,6 +620,32 @@ The response includes a list of successful keys and any errors encountered durin This structure helps users understand which key actions succeeded and which failed.` } as const; +export const $BulkBody_BulkDagRunBody_ = { + properties: { + actions: { + items: { + oneOf: [ + { + '$ref': '#/components/schemas/BulkCreateAction_BulkDagRunBody_' + }, + { + '$ref': '#/components/schemas/BulkUpdateAction_BulkDagRunBody_' + }, + { + '$ref': '#/components/schemas/BulkDeleteAction_BulkDagRunBody_' + } + ] + }, + type: 'array', + title: 'Actions' + } + }, + additionalProperties: false, + type: 'object', + required: ['actions'], + title: 'BulkBody[BulkDagRunBody]' +} as const; + export const $BulkBody_BulkTaskInstanceBody_ = { properties: { actions: { @@ -724,6 +750,84 @@ export const $BulkBody_VariableBody_ = { title: 'BulkBody[VariableBody]' } as const; +export const $BulkClearDagRunsBody = { + properties: { + runs: { + items: { + '$ref': '#/components/schemas/DagRunIdentifier' + }, + type: 'array', + title: 'Runs' + }, + only_failed: { + type: 'boolean', + title: 'Only Failed', + default: false + }, + only_new: { + type: 'boolean', + title: 'Only New', + description: 'Only queue newly added tasks in the latest Dag version without clearing existing tasks.', + default: false + }, + run_on_latest_version: { + type: 'boolean', + title: 'Run On Latest Version', + description: '(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run.', + default: false + }, + dry_run: { + type: 'boolean', + title: 'Dry Run', + default: true + }, + note: { + anyOf: [ + { + type: 'string', + maxLength: 1000 + }, + { + type: 'null' + } + ], + title: 'Note' + } + }, + additionalProperties: false, + type: 'object', + required: ['runs'], + title: 'BulkClearDagRunsBody', + description: 'Request body for the bulk clear Dag runs endpoint.' +} as const; + +export const $BulkCreateAction_BulkDagRunBody_ = { + properties: { + action: { + type: 'string', + const: 'create', + title: 'Action', + description: 'The action to be performed on the entities.' + }, + entities: { + items: { + '$ref': '#/components/schemas/BulkDagRunBody' + }, + type: 'array', + title: 'Entities', + description: 'A list of entities to be created.' + }, + action_on_existence: { + '$ref': '#/components/schemas/BulkActionOnExistence', + default: 'fail' + } + }, + additionalProperties: false, + type: 'object', + required: ['action', 'entities'], + title: 'BulkCreateAction[BulkDagRunBody]' +} as const; + export const $BulkCreateAction_BulkTaskInstanceBody_ = { properties: { action: { @@ -832,6 +936,87 @@ export const $BulkCreateAction_VariableBody_ = { title: 'BulkCreateAction[VariableBody]' } as const; +export const $BulkDagRunBody = { + properties: { + state: { + anyOf: [ + { + '$ref': '#/components/schemas/DAGRunPatchStates' + }, + { + type: 'null' + } + ] + }, + note: { + anyOf: [ + { + type: 'string', + maxLength: 1000 + }, + { + type: 'null' + } + ], + title: 'Note' + }, + dag_run_id: { + type: 'string', + title: 'Dag Run Id' + }, + dag_id: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Dag Id' + } + }, + additionalProperties: false, + type: 'object', + required: ['dag_run_id'], + title: 'BulkDagRunBody', + description: 'Request body for bulk update and delete Dag runs (patch fields plus run identity).' +} as const; + +export const $BulkDeleteAction_BulkDagRunBody_ = { + properties: { + action: { + type: 'string', + const: 'delete', + title: 'Action', + description: 'The action to be performed on the entities.' + }, + entities: { + items: { + anyOf: [ + { + type: 'string' + }, + { + '$ref': '#/components/schemas/BulkDagRunBody' + } + ] + }, + type: 'array', + title: 'Entities', + description: 'A list of entity id/key or entity objects to be deleted.' + }, + action_on_non_existence: { + '$ref': '#/components/schemas/BulkActionNotOnExistence', + default: 'fail' + } + }, + additionalProperties: false, + type: 'object', + required: ['action', 'entities'], + title: 'BulkDeleteAction[BulkDagRunBody]' +} as const; + export const $BulkDeleteAction_BulkTaskInstanceBody_ = { properties: { action: { @@ -1102,6 +1287,48 @@ export const $BulkTaskInstanceBody = { description: 'Request body for bulk update, and delete task instances.' } as const; +export const $BulkUpdateAction_BulkDagRunBody_ = { + properties: { + action: { + type: 'string', + const: 'update', + title: 'Action', + description: 'The action to be performed on the entities.' + }, + entities: { + items: { + '$ref': '#/components/schemas/BulkDagRunBody' + }, + type: 'array', + title: 'Entities', + description: 'A list of entities to be updated.' + }, + update_mask: { + anyOf: [ + { + items: { + type: 'string' + }, + type: 'array' + }, + { + type: 'null' + } + ], + title: 'Update Mask', + description: 'A list of field names to update for each entity.Only these fields will be applied from the request body to the database model.Any extra fields provided will be ignored.' + }, + action_on_non_existence: { + '$ref': '#/components/schemas/BulkActionNotOnExistence', + default: 'fail' + } + }, + additionalProperties: false, + type: 'object', + required: ['action', 'entities'], + title: 'BulkUpdateAction[BulkDagRunBody]' +} as const; + export const $BulkUpdateAction_BulkTaskInstanceBody_ = { properties: { action: { @@ -3401,6 +3628,31 @@ export const $DagRunAssetReference = { description: 'DagRun serializer for asset responses.' } as const; +export const $DagRunIdentifier = { + properties: { + dag_run_id: { + type: 'string', + title: 'Dag Run Id' + }, + dag_id: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Dag Id' + } + }, + additionalProperties: false, + type: 'object', + required: ['dag_run_id'], + title: 'DagRunIdentifier', + description: 'Identifier for a Dag run targeted by a bulk operation.' +} as const; + export const $DagRunState = { type: 'string', enum: ['queued', 'running', 'success', 'failed'], diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index ec044ac30856d..6edb6c3cff923 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, PostClearDagRunsData, PostClearDagRunsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; export class AssetService { /** @@ -963,28 +963,33 @@ export class DagRunService { } /** - * Clear Dag Run + * Bulk Dag Runs + * Bulk update and delete Dag runs. + * + * A single request handles many Dag runs in one transaction. Per-entity + * failures are reported via ``BulkResponse`` so that a partial failure does + * not abort the whole batch. + * + * The path's ``dag_id`` may be ``~`` for cross-DAG operations; in that case + * each entity must specify its own ``dag_id`` in the body. * @param data The data for the request. * @param data.dagId - * @param data.dagRunId * @param data.requestBody - * @returns unknown Successful Response + * @returns BulkResponse Successful Response * @throws ApiError */ - public static clearDagRun(data: ClearDagRunData): CancelablePromise { + public static bulkDagRuns(data: BulkDagRunsData): CancelablePromise { return __request(OpenAPI, { - method: 'POST', - url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear', + method: 'PATCH', + url: '/api/v2/dags/{dag_id}/dagRuns', path: { - dag_id: data.dagId, - dag_run_id: data.dagRunId + dag_id: data.dagId }, body: data.requestBody, mediaType: 'application/json', errors: { 401: 'Unauthorized', 403: 'Forbidden', - 404: 'Not Found', 422: 'Validation Error' } }); @@ -1148,6 +1153,68 @@ export class DagRunService { }); } + /** + * Post Clear Dag Runs + * Clear multiple Dag runs in a single request. + * + * Mirrors the per-DAG bulk pattern of ``POST /dags/{dag_id}/clearTaskInstances``: + * each ``(dag_id, dag_run_id)`` in ``runs`` is processed in the same transaction + * and per-entry failures are reported via ``BulkActionResponse.errors``. + * + * The path's ``dag_id`` may be ``~`` for cross-DAG clears; otherwise each entry + * must reference the same ``dag_id`` as the path. + * @param data The data for the request. + * @param data.dagId + * @param data.requestBody + * @returns BulkActionResponse Successful Response + * @throws ApiError + */ + public static postClearDagRuns(data: PostClearDagRunsData): CancelablePromise { + return __request(OpenAPI, { + method: 'POST', + url: '/api/v2/dags/{dag_id}/dagRuns/clear', + path: { + dag_id: data.dagId + }, + body: data.requestBody, + mediaType: 'application/json', + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Clear Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.requestBody + * @returns unknown Successful Response + * @throws ApiError + */ + public static clearDagRun(data: ClearDagRunData): CancelablePromise { + return __request(OpenAPI, { + method: 'POST', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId + }, + body: data.requestBody, + mediaType: 'application/json', + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the Dag run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 4d401bfc1a496..44ee61a81fd4b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -186,6 +186,10 @@ export type BulkActionResponse = { }>; }; +export type BulkBody_BulkDagRunBody_ = { + actions: Array<(BulkCreateAction_BulkDagRunBody_ | BulkUpdateAction_BulkDagRunBody_ | BulkDeleteAction_BulkDagRunBody_)>; +}; + export type BulkBody_BulkTaskInstanceBody_ = { actions: Array<(BulkCreateAction_BulkTaskInstanceBody_ | BulkUpdateAction_BulkTaskInstanceBody_ | BulkDeleteAction_BulkTaskInstanceBody_)>; }; @@ -202,6 +206,36 @@ export type BulkBody_VariableBody_ = { actions: Array<(BulkCreateAction_VariableBody_ | BulkUpdateAction_VariableBody_ | BulkDeleteAction_VariableBody_)>; }; +/** + * Request body for the bulk clear Dag runs endpoint. + */ +export type BulkClearDagRunsBody = { + runs: Array; + only_failed?: boolean; + /** + * Only queue newly added tasks in the latest Dag version without clearing existing tasks. + */ + only_new?: boolean; + /** + * (Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run. + */ + run_on_latest_version?: boolean; + dry_run?: boolean; + note?: string | null; +}; + +export type BulkCreateAction_BulkDagRunBody_ = { + /** + * The action to be performed on the entities. + */ + action: "create"; + /** + * A list of entities to be created. + */ + entities: Array; + action_on_existence?: BulkActionOnExistence; +}; + export type BulkCreateAction_BulkTaskInstanceBody_ = { /** * The action to be performed on the entities. @@ -250,6 +284,28 @@ export type BulkCreateAction_VariableBody_ = { action_on_existence?: BulkActionOnExistence; }; +/** + * Request body for bulk update and delete Dag runs (patch fields plus run identity). + */ +export type BulkDagRunBody = { + state?: DAGRunPatchStates | null; + note?: string | null; + dag_run_id: string; + dag_id?: string | null; +}; + +export type BulkDeleteAction_BulkDagRunBody_ = { + /** + * The action to be performed on the entities. + */ + action: "delete"; + /** + * A list of entity id/key or entity objects to be deleted. + */ + entities: Array<(string | BulkDagRunBody)>; + action_on_non_existence?: BulkActionNotOnExistence; +}; + export type BulkDeleteAction_BulkTaskInstanceBody_ = { /** * The action to be performed on the entities. @@ -336,6 +392,22 @@ export type BulkTaskInstanceBody = { dag_run_id?: string | null; }; +export type BulkUpdateAction_BulkDagRunBody_ = { + /** + * The action to be performed on the entities. + */ + action: "update"; + /** + * A list of entities to be updated. + */ + entities: Array; + /** + * A list of field names to update for each entity.Only these fields will be applied from the request body to the database model.Any extra fields provided will be ignored. + */ + update_mask?: Array<(string)> | null; + action_on_non_existence?: BulkActionNotOnExistence; +}; + export type BulkUpdateAction_BulkTaskInstanceBody_ = { /** * The action to be performed on the entities. @@ -841,6 +913,14 @@ export type DagRunAssetReference = { partition_key: string | null; }; +/** + * Identifier for a Dag run targeted by a bulk operation. + */ +export type DagRunIdentifier = { + dag_run_id: string; + dag_id?: string | null; +}; + /** * All possible states that a DagRun can be in. * @@ -2699,13 +2779,12 @@ export type GetUpstreamAssetEventsData = { export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse; -export type ClearDagRunData = { +export type BulkDagRunsData = { dagId: string; - dagRunId: string; - requestBody: DAGRunClearBody; + requestBody: BulkBody_BulkDagRunBody_; }; -export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse | DAGRunResponse; +export type BulkDagRunsResponse = BulkResponse; export type GetDagRunsData = { bundleVersion?: string | null; @@ -2803,6 +2882,21 @@ export type TriggerDagRunData = { export type TriggerDagRunResponse = DAGRunResponse; +export type PostClearDagRunsData = { + dagId: string; + requestBody: BulkClearDagRunsBody; +}; + +export type PostClearDagRunsResponse = BulkActionResponse; + +export type ClearDagRunData = { + dagId: string; + dagRunId: string; + requestBody: DAGRunClearBody; +}; + +export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse | DAGRunResponse; + export type WaitDagRunUntilFinishedData = { dagId: string; dagRunId: string; @@ -5066,14 +5160,14 @@ export type $OpenApiTs = { }; }; }; - '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': { - post: { - req: ClearDagRunData; + '/api/v2/dags/{dag_id}/dagRuns': { + patch: { + req: BulkDagRunsData; res: { /** * Successful Response */ - 200: ClearTaskInstanceCollectionResponse | DAGRunResponse; + 200: BulkResponse; /** * Unauthorized */ @@ -5082,18 +5176,12 @@ export type $OpenApiTs = { * Forbidden */ 403: HTTPExceptionResponse; - /** - * Not Found - */ - 404: HTTPExceptionResponse; /** * Validation Error */ 422: HTTPValidationError; }; }; - }; - '/api/v2/dags/{dag_id}/dagRuns': { get: { req: GetDagRunsData; res: { @@ -5153,6 +5241,60 @@ export type $OpenApiTs = { }; }; }; + '/api/v2/dags/{dag_id}/dagRuns/clear': { + post: { + req: PostClearDagRunsData; + res: { + /** + * Successful Response + */ + 200: BulkActionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': { + post: { + req: ClearDagRunData; + res: { + /** + * Successful Response + */ + 200: ClearTaskInstanceCollectionResponse | DAGRunResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait': { get: { req: WaitDagRunUntilFinishedData; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx deleted file mode 100644 index 50d1d9d8e6a61..0000000000000 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx +++ /dev/null @@ -1,48 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import "@testing-library/jest-dom"; -import { render, screen, waitFor } from "@testing-library/react"; -import { describe, expect, it } from "vitest"; - -import { AppWrapper } from "src/utils/AppWrapper"; - -// The dag_runs mock handler (see src/mocks/handlers/dag_runs.ts) returns: -// - run_before_filter (logical_date: 2024-12-31) — excluded when filtering Jan 2025 -// - run_in_range (logical_date: 2025-01-15) — included when filtering Jan 2025 -describe("DagRuns logical date filter", () => { - it("shows all runs when no logical date filter is applied", async () => { - render(); - - await waitFor(() => expect(screen.getByText("run_in_range")).toBeInTheDocument()); - expect(screen.getByText("run_before_filter")).toBeInTheDocument(); - }); - - it("filters runs by logical_date_gte and logical_date_lte URL params", async () => { - render( - , - ); - - await waitFor(() => expect(screen.getByText("run_in_range")).toBeInTheDocument()); - expect(screen.queryByText("run_before_filter")).not.toBeInTheDocument(); - }); -}); diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx new file mode 100644 index 0000000000000..787230e6e7ed1 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx @@ -0,0 +1,163 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Button, Flex, Heading, Text, useDisclosure, VStack } from "@chakra-ui/react"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; +import { CgRedo } from "react-icons/cg"; + +import type { DAGRunResponse } from "openapi/requests/types.gen"; +import { ActionAccordion } from "src/components/ActionAccordion"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { Checkbox, Dialog } from "src/components/ui"; +import SegmentedControl from "src/components/ui/SegmentedControl"; +import { useBulkClearDagRuns } from "src/queries/useBulkClearDagRuns"; + +import { BulkErrorList } from "./BulkErrorList"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedDagRuns: Array; +}; + +const DEFAULT_SELECTED_OPTIONS = ["existingTasks"]; + +const BulkClearDagRunsButton = ({ clearSelections, selectedDagRuns }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const [selectedOptions, setSelectedOptions] = useState>(DEFAULT_SELECTED_OPTIONS); + const [note, setNote] = useState(null); + const [runOnLatestVersion, setRunOnLatestVersion] = useState(false); + const { actionErrors, clear, error, isPending, reset } = useBulkClearDagRuns({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const handleClose = () => { + setSelectedOptions(DEFAULT_SELECTED_OPTIONS); + setNote(null); + setRunOnLatestVersion(false); + reset(); + onClose(); + }; + + const onlyFailed = selectedOptions.includes("onlyFailed"); + const onlyNew = selectedOptions.includes("newTasks"); + + return ( + <> + + + { + if (!details.open) { + handleClose(); + } + }} + open={open} + size="xl" + > + + + + + {translate("dags:runAndTaskActions.clear.title", { + type: translate("dagRun_other"), + })} + + + + + + + + + + + + {selectedDagRuns.length} {translate("dagRun_other")} + + + + {selectedDagRuns.map((dagRun) => ( + + + {dagRun.dag_id} + {" "} + / {dagRun.dag_run_id} + + ))} + + + + + + + + setRunOnLatestVersion(Boolean(event.checked))} + > + {translate("dags:runAndTaskActions.options.runOnLatestVersion")} + + + + + + + + ); +}; + +export default BulkClearDagRunsButton; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx new file mode 100644 index 0000000000000..e3b52b38786a8 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx @@ -0,0 +1,117 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Button, Flex, Heading, Text, useDisclosure, VStack } from "@chakra-ui/react"; +import { useTranslation } from "react-i18next"; +import { FiTrash2 } from "react-icons/fi"; + +import type { DAGRunResponse } from "openapi/requests/types.gen"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { Dialog } from "src/components/ui"; +import { useBulkDeleteDagRuns } from "src/queries/useBulkDeleteDagRuns"; + +import { BulkErrorList } from "./BulkErrorList"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedDagRuns: Array; +}; + +const BulkDeleteDagRunsButton = ({ clearSelections, selectedDagRuns }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const { actionErrors, error, isPending, remove, reset } = useBulkDeleteDagRuns({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const handleClose = () => { + reset(); + onClose(); + }; + + return ( + <> + + + { + if (!details.open) { + handleClose(); + } + }} + open={open} + size="xl" + > + + + + + {translate("dags:runAndTaskActions.delete.dialog.title", { + type: translate("dagRun_other"), + })} + + + + + + + + {translate("dags:runAndTaskActions.delete.dialog.warning", { + type: translate("dagRun_other"), + })} + + + + + {selectedDagRuns.map((dagRun) => ( + + + {dagRun.dag_id} + {" "} + / {dagRun.dag_run_id} + + ))} + + + + + + + + + + + + + ); +}; + +export default BulkDeleteDagRunsButton; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkErrorList.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkErrorList.tsx new file mode 100644 index 0000000000000..a0d1901db1196 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkErrorList.tsx @@ -0,0 +1,56 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Text, VStack } from "@chakra-ui/react"; + +import { Alert } from "src/components/ui"; + +import type { BulkErrorEntry } from "./bulkActionTypes"; + +type Props = { + readonly errors: ReadonlyArray; +}; + +const formatRunRef = (entry: BulkErrorEntry): string => { + const parts = [entry.dag_id, entry.dag_run_id].filter( + (part): part is string => part !== undefined && part !== null, + ); + + return parts.length > 0 ? parts.join(" / ") : "(unknown run)"; +}; + +export const BulkErrorList = ({ errors }: Props) => { + if (errors.length === 0) { + return undefined; + } + + return ( + + + {errors.map((entry) => ( + + + {formatRunRef(entry)} + + : {entry.error} + + ))} + + + ); +}; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkMarkDagRunsAsButton.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkMarkDagRunsAsButton.tsx new file mode 100644 index 0000000000000..e53a9d4ea0943 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkMarkDagRunsAsButton.tsx @@ -0,0 +1,156 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Badge, Box, Button, Flex, Heading, HStack, useDisclosure, VStack } from "@chakra-ui/react"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; +import { FiX } from "react-icons/fi"; +import { LuCheck } from "react-icons/lu"; + +import type { DAGRunPatchStates, DAGRunResponse } from "openapi/requests/types.gen"; +import { ActionAccordion } from "src/components/ActionAccordion"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { allowedStates } from "src/components/MarkAs/utils"; +import { StateBadge } from "src/components/StateBadge"; +import { Dialog, Menu } from "src/components/ui"; +import { useBulkUpdateDagRuns } from "src/queries/useBulkUpdateDagRuns"; + +import { BulkErrorList } from "./BulkErrorList"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedDagRuns: Array; +}; + +const BulkMarkDagRunsAsButton = ({ clearSelections, selectedDagRuns }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const [state, setState] = useState("success"); + const [note, setNote] = useState(null); + const { actionErrors, error, isPending, reset, update } = useBulkUpdateDagRuns({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const affectedCount = (targetState: DAGRunPatchStates) => + selectedDagRuns.filter((dr) => dr.state !== targetState).length; + + const handleOpen = (newState: DAGRunPatchStates) => { + setState(newState); + setNote(null); + reset(); + onOpen(); + }; + + const handleClose = () => { + setNote(null); + reset(); + onClose(); + }; + + const directlyAffected = selectedDagRuns.filter((dr) => dr.state !== state); + + return ( + + + +
+ +
+
+ + {allowedStates.map((menuState) => { + const count = affectedCount(menuState); + + return ( + { + if (count > 0) { + handleOpen(menuState); + } + }} + value={menuState} + > + + {translate(`common:states.${menuState}`)} + + {count} + + + + ); + })} + +
+ + { + if (!details.open) { + handleClose(); + } + }} + open={open} + size="xl" + > + + + + + {translate("dags:runAndTaskActions.markAs.title", { + state, + type: translate("dagRun_other"), + })}{" "} + + + + + + + + + + + + + + + + +
+ ); +}; + +export default BulkMarkDagRunsAsButton; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.test.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.test.tsx new file mode 100644 index 0000000000000..fcb798beaf0af --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.test.tsx @@ -0,0 +1,314 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import "@testing-library/jest-dom"; +import { fireEvent, render, screen, waitFor, within } from "@testing-library/react"; +import { http, HttpResponse } from "msw"; +import { setupServer, type SetupServerApi } from "msw/node"; +import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest"; + +import { handlers } from "src/mocks/handlers"; +import { AppWrapper } from "src/utils/AppWrapper"; + +let server: SetupServerApi; + +beforeAll(() => { + server = setupServer(...handlers); + server.listen({ onUnhandledRequest: "bypass" }); +}); + +afterEach(() => server.resetHandlers()); +afterAll(() => server.close()); + +// The dag_runs mock handler (see src/mocks/handlers/dag_runs.ts) returns: +// - run_before_filter (logical_date: 2024-12-31) — excluded when filtering Jan 2025 +// - run_in_range (logical_date: 2025-01-15) — included when filtering Jan 2025 +describe("DagRuns logical date filter", () => { + it("shows all runs when no logical date filter is applied", async () => { + render(); + + await waitFor(() => expect(screen.getByText("run_in_range")).toBeInTheDocument()); + expect(screen.getByText("run_before_filter")).toBeInTheDocument(); + }); + + it("filters runs by logical_date_gte and logical_date_lte URL params", async () => { + render( + , + ); + + await waitFor(() => expect(screen.getByText("run_in_range")).toBeInTheDocument()); + expect(screen.queryByText("run_before_filter")).not.toBeInTheDocument(); + }); +}); + +describe("DagRuns row selection", () => { + it("renders a select checkbox per row and reveals the action bar on selection", async () => { + render(); + + await waitFor(() => expect(screen.getByText("run_in_range")).toBeInTheDocument()); + + const runRow = screen.getByText("run_in_range").closest("tr"); + + expect(runRow).not.toBeNull(); + const [rowCheckbox] = within(runRow as HTMLElement).getAllByRole("checkbox"); + + expect(rowCheckbox).toBeDefined(); + fireEvent.click(rowCheckbox as HTMLElement); + + await waitFor(() => expect(screen.getByText(/1\s+Selected/iu)).toBeInTheDocument()); + }); +}); + +const selectRow = async (runText: string) => { + await waitFor(() => expect(screen.getByText(runText)).toBeInTheDocument()); + const row = screen.getByText(runText).closest("tr"); + + if (row === null) { + throw new Error(`Row for ${runText} not found`); + } + const [checkbox] = within(row).getAllByRole("checkbox"); + + if (checkbox === undefined) { + throw new Error(`Checkbox in row for ${runText} not found`); + } + fireEvent.click(checkbox); + + return row; +}; + +// Per-row buttons are Chakra IconButtons whose accessible name comes from +// `aria-label` and whose textContent only contains icon glyphs / single-char +// separators. The bulk-action buttons in the ActionBar render the translated +// label as a visible text node. We pick the button whose textContent itself +// matches the label regex, which is independent of locale state (the regex +// matches the i18n key under tests, the translated string in production). +const findBulkActionButton = (label: RegExp) => + screen.getAllByRole("button", { name: label }).find((btn) => label.test(btn.textContent)); + +type BulkBodyShape = { + actions: Array<{ + action: "create" | "delete" | "update"; + entities: Array<{ dag_id: string; dag_run_id: string; state?: string }>; + }>; +}; + +describe("DagRuns bulk delete", () => { + it("fires a single PATCH /dagRuns request listing every selected run on success", async () => { + const onBulk = vi.fn<(body: BulkBodyShape) => void>(); + + server.use( + http.patch("/api/v2/dags/:dagId/dagRuns", async ({ request }) => { + onBulk((await request.json()) as BulkBodyShape); + + return HttpResponse.json({ + delete: { + errors: [], + success: ["test_dag.run_in_range", "test_dag.run_before_filter"], + }, + }); + }), + ); + + render(); + + await selectRow("run_in_range"); + await selectRow("run_before_filter"); + + await waitFor(() => expect(screen.getByText(/2\s+Selected/iu)).toBeInTheDocument()); + + const bulkDeleteBtn = findBulkActionButton(/delete/iu); + + expect(bulkDeleteBtn).toBeDefined(); + fireEvent.click(bulkDeleteBtn as HTMLElement); + + // Chakra's ActionBar.Root itself has role="dialog", so wait for the + // confirm dialog (the second one) to mount and pick it explicitly. + await waitFor(() => expect(screen.getAllByRole("dialog")).toHaveLength(2)); + const dialogs = screen.getAllByRole("dialog"); + const confirmDialog = dialogs[dialogs.length - 1] as HTMLElement; + + expect(within(confirmDialog).getByText(/run_in_range/u)).toBeInTheDocument(); + expect(within(confirmDialog).getByText(/run_before_filter/u)).toBeInTheDocument(); + + fireEvent.click(within(confirmDialog).getByRole("button", { name: /confirm/iu })); + + await waitFor(() => expect(onBulk).toHaveBeenCalledTimes(1)); + const body = onBulk.mock.calls[0]?.[0] as BulkBodyShape; + + expect(body.actions).toHaveLength(1); + expect(body.actions[0]?.action).toBe("delete"); + const entityKeys = body.actions[0]?.entities + .map((entity) => `${entity.dag_id}.${entity.dag_run_id}`) + .sort(); + + expect(entityKeys).toEqual(["test_dag.run_before_filter", "test_dag.run_in_range"]); + + await waitFor(() => expect(screen.queryByRole("dialog")).not.toBeInTheDocument()); + }); + + it("keeps the dialog open and surfaces a per-entry error from the bulk response", async () => { + server.use( + http.patch("/api/v2/dags/:dagId/dagRuns", () => + HttpResponse.json({ + delete: { + errors: [{ error: "boom", status_code: 500 }], + success: ["test_dag.run_in_range"], + }, + }), + ), + ); + + render(); + + await selectRow("run_in_range"); + await selectRow("run_before_filter"); + + await waitFor(() => expect(screen.getByText(/2\s+Selected/iu)).toBeInTheDocument()); + const bulkDeleteBtn = findBulkActionButton(/delete/iu); + + expect(bulkDeleteBtn).toBeDefined(); + fireEvent.click(bulkDeleteBtn as HTMLElement); + + await waitFor(() => expect(screen.getAllByRole("dialog")).toHaveLength(2)); + const dialogs = screen.getAllByRole("dialog"); + const confirmDialog = dialogs[dialogs.length - 1] as HTMLElement; + + fireEvent.click(within(confirmDialog).getByRole("button", { name: /confirm/iu })); + + // Toaster portal lives outside AppWrapper, so the partial-success toast is not asserted. + await waitFor(() => expect(within(confirmDialog).getByText(/boom/iu)).toBeInTheDocument()); + expect(confirmDialog).toBeInTheDocument(); + }); +}); + +type ClearBodyShape = { + dry_run: boolean; + only_failed: boolean; + only_new: boolean; + runs: Array<{ dag_id: string; dag_run_id: string }>; +}; + +describe("DagRuns bulk clear", () => { + it("fires a single POST /dagRuns/clear with the selected runs and dialog options", async () => { + const onClear = vi.fn<(body: ClearBodyShape) => void>(); + + server.use( + http.post("/api/v2/dags/:dagId/dagRuns/clear", async ({ request }) => { + onClear((await request.json()) as ClearBodyShape); + + return HttpResponse.json({ + errors: [], + success: ["test_dag.run_in_range", "test_dag.run_before_filter"], + }); + }), + ); + + render(); + + await selectRow("run_in_range"); + await selectRow("run_before_filter"); + await waitFor(() => expect(screen.getByText(/2\s+Selected/iu)).toBeInTheDocument()); + + const bulkClearBtn = findBulkActionButton(/clear/iu); + + expect(bulkClearBtn).toBeDefined(); + fireEvent.click(bulkClearBtn as HTMLElement); + + await waitFor(() => expect(screen.getAllByRole("dialog")).toHaveLength(2)); + const dialogs = screen.getAllByRole("dialog"); + const confirmDialog = dialogs[dialogs.length - 1] as HTMLElement; + + fireEvent.click(within(confirmDialog).getByRole("button", { name: /confirm/iu })); + + await waitFor(() => expect(onClear).toHaveBeenCalledTimes(1)); + const body = onClear.mock.calls[0]?.[0] as ClearBodyShape; + + expect(body.dry_run).toBe(false); + expect(body.only_failed).toBe(false); + expect(body.only_new).toBe(false); + const runKeys = body.runs.map((run) => `${run.dag_id}.${run.dag_run_id}`).sort(); + + expect(runKeys).toEqual(["test_dag.run_before_filter", "test_dag.run_in_range"]); + }); +}); + +describe("DagRuns bulk mark-as", () => { + it("fires a single PATCH /dagRuns request with the chosen state for each affected run", async () => { + const onBulk = vi.fn<(body: BulkBodyShape) => void>(); + + server.use( + http.patch("/api/v2/dags/:dagId/dagRuns", async ({ request }) => { + onBulk((await request.json()) as BulkBodyShape); + + return HttpResponse.json({ + update: { + errors: [], + success: ["test_dag.run_in_range", "test_dag.run_before_filter"], + }, + }); + }), + ); + + render(); + + await selectRow("run_in_range"); + await selectRow("run_before_filter"); + await waitFor(() => expect(screen.getByText(/2\s+Selected/iu)).toBeInTheDocument()); + + // Open the Mark As menu — both seeded runs are in "success" state, so + // only the "failed" entry has a non-zero affected count. + const bulkMarkBtn = findBulkActionButton(/mark/iu); + + expect(bulkMarkBtn).toBeDefined(); + fireEvent.click(bulkMarkBtn as HTMLElement); + + // In the bulk Mark-As menu, items are plain Menu.Item nodes (no + // `data-testid`); the per-row MarkRunAsButton menus render their own + // menuitems with `data-testid="mark-run-as-*"`. Pick by absence of that + // testid so we always target the bulk menu regardless of which menu is + // open. + await waitFor(() => expect(screen.getAllByRole("menuitem").length).toBeGreaterThan(0)); + const failedItem = screen + .getAllByRole("menuitem") + .find((mi) => mi.dataset.value === "failed" && mi.dataset.testid === undefined); + + expect(failedItem).toBeDefined(); + fireEvent.click(failedItem as HTMLElement); + + await waitFor(() => expect(screen.getAllByRole("dialog")).toHaveLength(2)); + const dialogs = screen.getAllByRole("dialog"); + const confirmDialog = dialogs[dialogs.length - 1] as HTMLElement; + + fireEvent.click(within(confirmDialog).getByRole("button", { name: /confirm/iu })); + + await waitFor(() => expect(onBulk).toHaveBeenCalledTimes(1)); + const body = onBulk.mock.calls[0]?.[0] as BulkBodyShape; + + expect(body.actions[0]?.action).toBe("update"); + expect(body.actions[0]?.entities.every((entity) => entity.state === "failed")).toBe(true); + const entityKeys = body.actions[0]?.entities + .map((entity) => `${entity.dag_id}.${entity.dag_run_id}`) + .sort(); + + expect(entityKeys).toEqual(["test_dag.run_before_filter", "test_dag.run_in_range"]); + }); +}); diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx similarity index 80% rename from airflow-core/src/airflow/ui/src/pages/DagRuns.tsx rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx index bc1aa2f3d14b9..f124f06d0ca6a 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx @@ -27,6 +27,7 @@ import type { DAGRunResponse } from "openapi/requests/types.gen"; import { ClearRunButton } from "src/components/Clear"; import { DagVersion } from "src/components/DagVersion"; import { DataTable } from "src/components/DataTable"; +import { useRowSelection, type GetColumnsParams } from "src/components/DataTable/useRowSelection"; import { useTableURLState } from "src/components/DataTable/useTableUrlState"; import { ErrorAlert } from "src/components/ErrorAlert"; import { LimitedItemsList } from "src/components/LimitedItemsList"; @@ -37,13 +38,22 @@ import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; import { TruncatedText } from "src/components/TruncatedText"; import { RouterLink } from "src/components/ui"; +import { ActionBar } from "src/components/ui/ActionBar"; +import { Checkbox } from "src/components/ui/Checkbox"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { useAdvancedSearchArg } from "src/hooks/useAdvancedSearch"; -import { DagRunsFilters } from "src/pages/DagRunsFilters"; import DeleteRunButton from "src/pages/DeleteRunButton"; import { renderDuration, useAutoRefresh, isStatePending } from "src/utils"; +import BulkClearDagRunsButton from "./BulkClearDagRunsButton"; +import BulkDeleteDagRunsButton from "./BulkDeleteDagRunsButton"; +import BulkMarkDagRunsAsButton from "./BulkMarkDagRunsAsButton"; +import { DagRunsFilters } from "./DagRunsFilters"; + type DagRunRow = { row: { original: DAGRunResponse } }; + +const getRowKey = (dagRun: DAGRunResponse) => `${dagRun.dag_id}:${dagRun.dag_run_id}`; + const { BUNDLE_VERSION: BUNDLE_VERSION_PARAM, CONF_CONTAINS: CONF_CONTAINS_PARAM, @@ -67,7 +77,43 @@ const { TRIGGERING_USER_NAME_PATTERN: TRIGGERING_USER_NAME_PATTERN_PARAM, }: SearchParamsKeysType = SearchParamsKeys; -const runColumns = (translate: TFunction, dagId?: string): Array> => [ +type ColumnsParams = { + readonly dagId?: string; + readonly translate: TFunction; +}; + +const runColumns = ({ + allRowsSelected, + dagId, + onRowSelect, + onSelectAll, + selectedRows, + translate, +}: ColumnsParams & GetColumnsParams): Array> => [ + { + accessorKey: "select", + cell: ({ row }) => ( + onRowSelect(getRowKey(row.original), Boolean(event.checked))} + /> + ), + enableHiding: false, + enableSorting: false, + header: () => ( + onSelectAll(Boolean(event.checked))} + /> + ), + meta: { + skeletonWidth: 10, + }, + }, ...(Boolean(dagId) ? [] : [ @@ -287,11 +333,30 @@ export const DagRuns = () => { }, ); - const columns = runColumns(translate, dagId); - const nextCursor = data?.next_cursor ?? undefined; const previousCursor = data?.previous_cursor ?? undefined; + const { allRowsSelected, clearSelections, handleRowSelect, handleSelectAll, selectedRows } = + useRowSelection({ + data: data?.dag_runs, + getKey: getRowKey, + }); + + const selectedDagRuns = (data?.dag_runs ?? []).filter((dr) => selectedRows.has(getRowKey(dr))); + + const columns = runColumns({ + allRowsSelected, + dagId, + // GetColumnsParams requires `multiTeam`, but the Dag Runs columns do not + // branch on it (unlike `Variables` / `Connections`); kept false to satisfy + // the shared selection-column type. + multiTeam: false, + onRowSelect: handleRowSelect, + onSelectAll: handleSelectAll, + selectedRows, + translate, + }); + return ( <> @@ -306,6 +371,18 @@ export const DagRuns = () => { onStateChange={setTableURLState} previousCursor={previousCursor} /> + + + + {selectedRows.size} {translate("selected")} + + + + + + + + ); }; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRunsFilters.tsx similarity index 100% rename from airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRunsFilters.tsx diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/bulkActionTypes.ts b/airflow-core/src/airflow/ui/src/pages/DagRuns/bulkActionTypes.ts new file mode 100644 index 0000000000000..0a03bf03769f2 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/bulkActionTypes.ts @@ -0,0 +1,24 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +export type BulkErrorEntry = { + dag_id?: string | null; + dag_run_id?: string | null; + error: string; + status_code: number; +}; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts b/airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts new file mode 100644 index 0000000000000..26ed3a4da898f --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts @@ -0,0 +1,19 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +export { DagRuns } from "./DagRuns"; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts new file mode 100644 index 0000000000000..128c482739510 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts @@ -0,0 +1,109 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { + useDagRunServiceGetDagRunsKey, + useDagRunServicePostClearDagRuns, + useTaskInstanceServiceGetTaskInstancesKey, +} from "openapi/queries"; +import type { DAGRunResponse } from "openapi/requests/types.gen"; +import { toaster } from "src/components/ui"; +import type { BulkErrorEntry } from "src/pages/DagRuns/bulkActionTypes"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly onSuccessConfirm: VoidFunction; +}; + +export type BulkClearDagRunsOptions = { + note: string | null; + onlyFailed: boolean; + onlyNew: boolean; + runOnLatestVersion: boolean; +}; + +export const useBulkClearDagRuns = ({ clearSelections, onSuccessConfirm }: Props) => { + const queryClient = useQueryClient(); + const { t: translate } = useTranslation(["common", "dags"]); + const [actionErrors, setActionErrors] = useState>([]); + + const mutation = useDagRunServicePostClearDagRuns({ + onSuccess: async (response) => { + const errors = (response.errors ?? []) as Array; + const successes = response.success ?? []; + + if (successes.length > 0) { + await Promise.all([ + queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }), + queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }), + ]); + + toaster.create({ + description: translate("toaster.bulkClear.success.description", { + count: successes.length, + keys: successes.join(", "), + resourceName: translate("dagRun_other"), + }), + title: translate("toaster.bulkClear.success.title", { + resourceName: translate("dagRun_other"), + }), + type: "success", + }); + } + + if (errors.length > 0) { + setActionErrors(errors); + } else { + clearSelections(); + onSuccessConfirm(); + } + }, + }); + + const clear = (dagRuns: Array, options: BulkClearDagRunsOptions) => { + setActionErrors([]); + mutation.mutate({ + dagId: "~", + requestBody: { + dry_run: false, + note: options.note, + only_failed: options.onlyFailed, + only_new: options.onlyNew, + run_on_latest_version: options.runOnLatestVersion, + runs: dagRuns.map((dr) => ({ dag_id: dr.dag_id, dag_run_id: dr.dag_run_id })), + }, + }); + }; + + const reset = () => { + setActionErrors([]); + mutation.reset(); + }; + + return { + actionErrors, + clear, + error: mutation.error, + isPending: mutation.isPending, + reset, + }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteDagRuns.ts b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteDagRuns.ts new file mode 100644 index 0000000000000..1610bad1b13d7 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteDagRuns.ts @@ -0,0 +1,106 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { + useDagRunServiceBulkDagRuns, + useDagRunServiceGetDagRunsKey, + useTaskInstanceServiceGetTaskInstancesKey, +} from "openapi/queries"; +import type { DAGRunResponse } from "openapi/requests/types.gen"; +import { toaster } from "src/components/ui"; +import type { BulkErrorEntry } from "src/pages/DagRuns/bulkActionTypes"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly onSuccessConfirm: VoidFunction; +}; + +export const useBulkDeleteDagRuns = ({ clearSelections, onSuccessConfirm }: Props) => { + const queryClient = useQueryClient(); + const { t: translate } = useTranslation(["common", "dags"]); + const [actionErrors, setActionErrors] = useState>([]); + + const mutation = useDagRunServiceBulkDagRuns({ + onSuccess: async (response) => { + const errors = (response.delete?.errors ?? []) as Array; + const successes = response.delete?.success ?? []; + + if (successes.length > 0) { + await Promise.all([ + queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }), + queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }), + ]); + + toaster.create({ + description: translate("toaster.bulkDelete.success.description", { + count: successes.length, + keys: successes.join(", "), + resourceName: translate("dagRun_other"), + }), + title: translate("toaster.bulkDelete.success.title", { + resourceName: translate("dagRun_other"), + }), + type: "success", + }); + } + + if (errors.length > 0) { + setActionErrors(errors); + } else { + clearSelections(); + onSuccessConfirm(); + } + }, + }); + + const remove = (dagRuns: Array) => { + setActionErrors([]); + mutation.mutate({ + dagId: "~", + requestBody: { + actions: [ + { + action: "delete" as const, + action_on_non_existence: "skip", + entities: dagRuns.map((dr) => ({ + dag_id: dr.dag_id, + dag_run_id: dr.dag_run_id, + })), + }, + ], + }, + }); + }; + + const reset = () => { + setActionErrors([]); + mutation.reset(); + }; + + return { + actionErrors, + error: mutation.error, + isPending: mutation.isPending, + remove, + reset, + }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkUpdateDagRuns.ts b/airflow-core/src/airflow/ui/src/queries/useBulkUpdateDagRuns.ts new file mode 100644 index 0000000000000..6b54f057121a8 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkUpdateDagRuns.ts @@ -0,0 +1,116 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { + useDagRunServiceBulkDagRuns, + useDagRunServiceGetDagRunsKey, + useTaskInstanceServiceGetTaskInstancesKey, +} from "openapi/queries"; +import type { DAGRunPatchStates, DAGRunResponse } from "openapi/requests/types.gen"; +import { toaster } from "src/components/ui"; +import type { BulkErrorEntry } from "src/pages/DagRuns/bulkActionTypes"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly onSuccessConfirm: VoidFunction; +}; + +export type BulkUpdateDagRunsOptions = { + note: string | null; + state: DAGRunPatchStates; +}; + +export const useBulkUpdateDagRuns = ({ clearSelections, onSuccessConfirm }: Props) => { + const queryClient = useQueryClient(); + const { t: translate } = useTranslation(["common", "dags"]); + const [actionErrors, setActionErrors] = useState>([]); + + const mutation = useDagRunServiceBulkDagRuns({ + onSuccess: async (response) => { + const errors = (response.update?.errors ?? []) as Array; + const successes = response.update?.success ?? []; + + if (successes.length > 0) { + await Promise.all([ + queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }), + queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }), + ]); + + toaster.create({ + description: translate("toaster.bulkUpdate.success.description", { + count: successes.length, + keys: successes.join(", "), + resourceName: translate("dagRun_other"), + }), + title: translate("toaster.bulkUpdate.success.title", { + resourceName: translate("dagRun_other"), + }), + type: "success", + }); + } + + if (errors.length > 0) { + setActionErrors(errors); + } else { + clearSelections(); + onSuccessConfirm(); + } + }, + }); + + const update = (dagRuns: Array, options: BulkUpdateDagRunsOptions) => { + setActionErrors([]); + const updateMask = options.note === null ? ["state"] : ["state", "note"]; + + mutation.mutate({ + dagId: "~", + requestBody: { + actions: [ + { + action: "update" as const, + action_on_non_existence: "skip", + entities: dagRuns.map((dr) => ({ + dag_id: dr.dag_id, + dag_run_id: dr.dag_run_id, + note: options.note, + state: options.state, + })), + update_mask: updateMask, + }, + ], + }, + }); + }; + + const reset = () => { + setActionErrors([]); + mutation.reset(); + }; + + return { + actionErrors, + error: mutation.error, + isPending: mutation.isPending, + reset, + update, + }; +}; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 1c2e8b3219674..ac9fc6f653ad7 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1843,6 +1843,413 @@ def test_clear_dag_run_only_new_and_only_failed_mutually_exclusive(self, test_cl assert response.status_code == 422 +class TestBulkDagRuns: + """Tests for ``PATCH /dags/{dag_id}/dagRuns`` (bulk update / delete).""" + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_update_state_for_multiple_runs(self, test_client, session): + body = { + "actions": [ + { + "action": "update", + "entities": [ + {"dag_run_id": DAG1_RUN1_ID, "state": DagRunState.FAILED}, + {"dag_run_id": DAG1_RUN2_ID, "state": DagRunState.SUCCESS}, + ], + "update_mask": ["state"], + } + ] + } + response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json=body) + assert response.status_code == 200 + result = response.json() + assert sorted(result["update"]["success"]) == sorted( + [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG1_ID}.{DAG1_RUN2_ID}"] + ) + assert result["update"]["errors"] == [] + + runs = session.scalars(select(DagRun).where(DagRun.dag_id == DAG1_ID)).all() + states = {r.run_id: r.state for r in runs} + assert states[DAG1_RUN1_ID] == DagRunState.FAILED + assert states[DAG1_RUN2_ID] == DagRunState.SUCCESS + + def test_bulk_delete_multiple_runs(self, test_client, session): + body = { + "actions": [ + { + "action": "delete", + "entities": [ + {"dag_run_id": DAG1_RUN1_ID}, + {"dag_run_id": DAG1_RUN2_ID}, + ], + } + ] + } + response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json=body) + assert response.status_code == 200 + result = response.json() + assert sorted(result["delete"]["success"]) == sorted( + [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG1_ID}.{DAG1_RUN2_ID}"] + ) + assert result["delete"]["errors"] == [] + + remaining_run_ids = set(session.scalars(select(DagRun.run_id).where(DagRun.dag_id == DAG1_ID)).all()) + assert DAG1_RUN1_ID not in remaining_run_ids + assert DAG1_RUN2_ID not in remaining_run_ids + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_update_cross_dag_with_wildcard(self, test_client, session): + body = { + "actions": [ + { + "action": "update", + "entities": [ + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID, "state": DagRunState.FAILED}, + {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID, "state": DagRunState.QUEUED}, + ], + "update_mask": ["state"], + } + ] + } + response = test_client.patch("/dags/~/dagRuns", json=body) + assert response.status_code == 200 + result = response.json() + assert sorted(result["update"]["success"]) == sorted( + [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG2_ID}.{DAG2_RUN1_ID}"] + ) + + def test_bulk_update_rejects_mismatched_dag_id_in_path(self, test_client, session): + body = { + "actions": [ + { + "action": "update", + "entities": [ + {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID, "state": DagRunState.FAILED}, + ], + "update_mask": ["state"], + } + ] + } + response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json=body) + assert response.status_code == 200 + result = response.json() + assert result["update"]["success"] == [] + assert len(result["update"]["errors"]) == 1 + assert result["update"]["errors"][0]["status_code"] == 400 + + dag_run = session.scalar( + select(DagRun).where(DagRun.dag_id == DAG2_ID, DagRun.run_id == DAG2_RUN1_ID) + ) + assert dag_run.state == DAG2_RUN1_STATE + + def test_bulk_delete_rejects_mismatched_dag_id_in_path(self, test_client, session): + body = { + "actions": [ + { + "action": "delete", + "entities": [ + {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID}, + ], + } + ] + } + response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json=body) + assert response.status_code == 200 + result = response.json() + assert result["delete"]["success"] == [] + assert len(result["delete"]["errors"]) == 1 + assert result["delete"]["errors"][0]["status_code"] == 400 + + assert session.scalar(select(DagRun).where(DagRun.dag_id == DAG2_ID, DagRun.run_id == DAG2_RUN1_ID)) + + def test_bulk_delete_rejects_non_deletable_state(self, test_client, session): + dag_run = session.scalar( + select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == DAG1_RUN1_ID) + ) + dag_run.state = DagRunState.RUNNING + session.commit() + + body = { + "actions": [ + { + "action": "delete", + "entities": [ + {"dag_run_id": DAG1_RUN1_ID}, + {"dag_run_id": DAG1_RUN2_ID}, + ], + } + ] + } + response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json=body) + assert response.status_code == 200 + result = response.json() + assert result["delete"]["success"] == [f"{DAG1_ID}.{DAG1_RUN2_ID}"] + assert len(result["delete"]["errors"]) == 1 + assert result["delete"]["errors"][0]["status_code"] == 409 + + remaining_run_ids = set(session.scalars(select(DagRun.run_id).where(DagRun.dag_id == DAG1_ID)).all()) + assert DAG1_RUN1_ID in remaining_run_ids + assert DAG1_RUN2_ID not in remaining_run_ids + + def test_bulk_update_with_skip_when_run_missing(self, test_client): + body = { + "actions": [ + { + "action": "update", + "entities": [ + {"dag_run_id": DAG1_RUN1_ID, "state": DagRunState.FAILED}, + {"dag_run_id": "does_not_exist", "state": DagRunState.FAILED}, + ], + "update_mask": ["state"], + "action_on_non_existence": "skip", + } + ] + } + response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json=body) + assert response.status_code == 200 + result = response.json() + # Only the existing run is reported as success; the missing one is skipped silently. + assert result["update"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"] + assert result["update"]["errors"] == [] + + def test_bulk_update_fail_when_run_missing(self, test_client): + body = { + "actions": [ + { + "action": "update", + "entities": [ + {"dag_run_id": DAG1_RUN1_ID, "state": DagRunState.FAILED}, + {"dag_run_id": "does_not_exist", "state": DagRunState.FAILED}, + ], + "update_mask": ["state"], + "action_on_non_existence": "fail", + } + ] + } + response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json=body) + assert response.status_code == 200 + result = response.json() + # The whole action is rejected because at least one entity is missing. + assert result["update"]["success"] == [] + assert len(result["update"]["errors"]) == 1 + assert result["update"]["errors"][0]["status_code"] == 404 + + def test_bulk_create_is_not_supported(self, test_client): + body = { + "actions": [ + { + "action": "create", + "entities": [{"dag_run_id": "new_run"}], + } + ] + } + response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json=body) + assert response.status_code == 200 + result = response.json() + assert len(result["create"]["errors"]) == 1 + assert result["create"]["errors"][0]["status_code"] == 405 + + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json={"actions": []}) + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json={"actions": []}) + assert response.status_code == 403 + + @pytest.mark.parametrize( + ("state", "listener_state"), + [ + ("queued", []), + ("success", [DagRunState.SUCCESS]), + ("failed", [DagRunState.FAILED]), + ], + ) + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_update_notifies_listeners(self, test_client, state, listener_state, listener_manager): + from unit.listeners.class_listener import ClassBasedListener + + listener = ClassBasedListener() + listener_manager(listener) + body = { + "actions": [ + { + "action": "update", + "entities": [{"dag_run_id": DAG1_RUN1_ID, "state": state}], + "update_mask": ["state"], + } + ] + } + response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns", json=body) + assert response.status_code == 200 + assert listener.state == listener_state + + +class TestPostClearDagRuns: + """Tests for ``POST /dags/{dag_id}/dagRuns/clear`` (bulk clear).""" + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_two_runs(self, test_client, session): + body = { + "runs": [ + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID}, + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN2_ID}, + ], + "dry_run": False, + } + response = test_client.post(f"/dags/{DAG1_ID}/dagRuns/clear", json=body) + assert response.status_code == 200 + result = response.json() + assert sorted(result["success"]) == sorted([f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG1_ID}.{DAG1_RUN2_ID}"]) + assert result["errors"] == [] + + # Cleared runs are reset to QUEUED. + runs = session.scalars( + select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id.in_([DAG1_RUN1_ID, DAG1_RUN2_ID])) + ).all() + assert {r.state for r in runs} == {DagRunState.QUEUED} + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_defaults_to_dry_run(self, test_client, session): + body = { + "runs": [ + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID}, + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN2_ID}, + ], + } + response = test_client.post(f"/dags/{DAG1_ID}/dagRuns/clear", json=body) + assert response.status_code == 200 + result = response.json() + assert sorted(result["success"]) == sorted([f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG1_ID}.{DAG1_RUN2_ID}"]) + assert result["errors"] == [] + + runs = session.scalars( + select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id.in_([DAG1_RUN1_ID, DAG1_RUN2_ID])) + ).all() + states = {run.run_id: run.state for run in runs} + assert states[DAG1_RUN1_ID] == DAG1_RUN1_STATE + assert states[DAG1_RUN2_ID] == DAG1_RUN2_STATE + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_cross_dag_with_wildcard(self, test_client): + body = { + "runs": [ + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID}, + {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID}, + ], + "dry_run": False, + } + response = test_client.post("/dags/~/dagRuns/clear", json=body) + assert response.status_code == 200 + result = response.json() + assert sorted(result["success"]) == sorted([f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG2_ID}.{DAG2_RUN1_ID}"]) + + def test_bulk_clear_reports_missing_runs_as_per_entry_errors(self, test_client): + body = { + "runs": [ + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID}, + {"dag_id": DAG1_ID, "dag_run_id": "does_not_exist"}, + ], + "dry_run": False, + } + response = test_client.post(f"/dags/{DAG1_ID}/dagRuns/clear", json=body) + assert response.status_code == 200 + result = response.json() + # Successful run is still cleared; missing run is reported per-entry rather + # than aborting the whole batch. + assert result["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"] + assert len(result["errors"]) == 1 + assert result["errors"][0]["status_code"] == 404 + assert result["errors"][0]["dag_run_id"] == "does_not_exist" + + def test_bulk_clear_rejects_mismatched_dag_id_in_path(self, test_client): + body = { + "runs": [{"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID}], + "dry_run": False, + } + response = test_client.post(f"/dags/{DAG1_ID}/dagRuns/clear", json=body) + assert response.status_code == 200 + result = response.json() + assert result["success"] == [] + assert len(result["errors"]) == 1 + assert result["errors"][0]["status_code"] == 400 + assert "does not match path dag_id" in result["errors"][0]["error"] + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_falls_back_to_path_dag_id(self, test_client, session): + body = { + "runs": [{"dag_run_id": DAG1_RUN1_ID}], + "dry_run": False, + } + response = test_client.post(f"/dags/{DAG1_ID}/dagRuns/clear", json=body) + assert response.status_code == 200 + result = response.json() + assert result["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"] + assert result["errors"] == [] + + def test_bulk_clear_rejects_wildcard_path_when_entity_omits_dag_id(self, test_client): + body = { + "runs": [{"dag_run_id": DAG1_RUN1_ID}], + "dry_run": False, + } + response = test_client.post("/dags/~/dagRuns/clear", json=body) + assert response.status_code == 200 + result = response.json() + assert result["success"] == [] + assert len(result["errors"]) == 1 + assert result["errors"][0]["status_code"] == 400 + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_applies_note_when_provided(self, test_client, session): + body = { + "runs": [ + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID}, + {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN2_ID}, + ], + "dry_run": False, + "note": "post-incident cleanup", + } + response = test_client.post(f"/dags/{DAG1_ID}/dagRuns/clear", json=body) + assert response.status_code == 200 + assert response.json()["errors"] == [] + + runs = session.scalars( + select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id.in_([DAG1_RUN1_ID, DAG1_RUN2_ID])) + ).all() + assert {r.note for r in runs} == {"post-incident cleanup"} + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_bulk_clear_dry_run_does_not_apply_note(self, test_client, session): + body = { + "runs": [{"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN2_ID}], + "note": "should not be saved", + } + response = test_client.post(f"/dags/{DAG1_ID}/dagRuns/clear", json=body) + assert response.status_code == 200 + + run = session.scalar(select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == DAG1_RUN2_ID)) + assert run is not None + assert run.note is None + + def test_bulk_clear_validates_only_new_only_failed_mutual_exclusion(self, test_client): + body = { + "runs": [{"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID}], + "only_new": True, + "only_failed": True, + "dry_run": False, + } + response = test_client.post(f"/dags/{DAG1_ID}/dagRuns/clear", json=body) + assert response.status_code == 422 + + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.post(f"/dags/{DAG1_ID}/dagRuns/clear", json={"runs": []}) + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.post(f"/dags/{DAG1_ID}/dagRuns/clear", json={"runs": []}) + assert response.status_code == 403 + + class TestClearDagRunOnlyNew: """Integration tests for only_new=True using a real two-version DAG. diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 276c8699de058..f7694b93a6f88 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -108,6 +108,10 @@ class BulkActionResponse(BaseModel): ] = [] +class Note(RootModel[str]): + root: Annotated[str, Field(max_length=1000, title="Note")] + + class BulkResponse(BaseModel): """ Serializer for responses to bulk entity operations. @@ -131,10 +135,6 @@ class BulkResponse(BaseModel): ] = None -class Note(RootModel[str]): - root: Annotated[str, Field(max_length=1000, title="Note")] - - class TaskIds(RootModel[list]): root: Annotated[list, Field(max_length=2, min_length=2)] @@ -364,6 +364,18 @@ class DagRunAssetReference(BaseModel): partition_key: Annotated[str | None, Field(title="Partition Key")] = None +class DagRunIdentifier(BaseModel): + """ + Identifier for a Dag run targeted by a bulk operation. + """ + + model_config = ConfigDict( + extra="forbid", + ) + dag_run_id: Annotated[str, Field(title="Dag Run Id")] + dag_id: Annotated[str | None, Field(title="Dag Id")] = None + + class DagRunState(str, Enum): """ All possible states that a DagRun can be in. @@ -1173,6 +1185,34 @@ class BackfillResponse(BaseModel): dag_display_name: Annotated[str, Field(title="Dag Display Name")] +class BulkClearDagRunsBody(BaseModel): + """ + Request body for the bulk clear Dag runs endpoint. + """ + + model_config = ConfigDict( + extra="forbid", + ) + runs: Annotated[list[DagRunIdentifier], Field(title="Runs")] + only_failed: Annotated[bool | None, Field(title="Only Failed")] = False + only_new: Annotated[ + bool | None, + Field( + description="Only queue newly added tasks in the latest Dag version without clearing existing tasks.", + title="Only New", + ), + ] = False + run_on_latest_version: Annotated[ + bool | None, + Field( + description="(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run.", + title="Run On Latest Version", + ), + ] = False + dry_run: Annotated[bool | None, Field(title="Dry Run")] = True + note: Annotated[Note | None, Field(title="Note")] = None + + class BulkCreateActionConnectionBody(BaseModel): model_config = ConfigDict( extra="forbid", @@ -1212,6 +1252,34 @@ class BulkCreateActionVariableBody(BaseModel): action_on_existence: BulkActionOnExistence | None = "fail" +class BulkDagRunBody(BaseModel): + """ + Request body for bulk update and delete Dag runs (patch fields plus run identity). + """ + + model_config = ConfigDict( + extra="forbid", + ) + state: DAGRunPatchStates | None = None + note: Annotated[Note | None, Field(title="Note")] = None + dag_run_id: Annotated[str, Field(title="Dag Run Id")] + dag_id: Annotated[str | None, Field(title="Dag Id")] = None + + +class BulkDeleteActionBulkDagRunBody(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + action: Annotated[ + Literal["delete"], Field(description="The action to be performed on the entities.", title="Action") + ] + entities: Annotated[ + list[str | BulkDagRunBody], + Field(description="A list of entity id/key or entity objects to be deleted.", title="Entities"), + ] + action_on_non_existence: BulkActionNotOnExistence | None = "fail" + + class BulkDeleteActionConnectionBody(BaseModel): model_config = ConfigDict( extra="forbid", @@ -1274,6 +1342,26 @@ class BulkTaskInstanceBody(BaseModel): dag_run_id: Annotated[str | None, Field(title="Dag Run Id")] = None +class BulkUpdateActionBulkDagRunBody(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + action: Annotated[ + Literal["update"], Field(description="The action to be performed on the entities.", title="Action") + ] + entities: Annotated[ + list[BulkDagRunBody], Field(description="A list of entities to be updated.", title="Entities") + ] + update_mask: Annotated[ + list[str] | None, + Field( + description="A list of field names to update for each entity.Only these fields will be applied from the request body to the database model.Any extra fields provided will be ignored.", + title="Update Mask", + ), + ] = None + action_on_non_existence: BulkActionNotOnExistence | None = "fail" + + class BulkUpdateActionBulkTaskInstanceBody(BaseModel): model_config = ConfigDict( extra="forbid", @@ -1931,6 +2019,19 @@ class BulkBodyVariableBody(BaseModel): ] +class BulkCreateActionBulkDagRunBody(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + action: Annotated[ + Literal["create"], Field(description="The action to be performed on the entities.", title="Action") + ] + entities: Annotated[ + list[BulkDagRunBody], Field(description="A list of entities to be created.", title="Entities") + ] + action_on_existence: BulkActionOnExistence | None = "fail" + + class BulkCreateActionBulkTaskInstanceBody(BaseModel): model_config = ConfigDict( extra="forbid", @@ -2144,6 +2245,18 @@ class TaskInstanceHistoryCollectionResponse(BaseModel): total_entries: Annotated[int, Field(title="Total Entries")] +class BulkBodyBulkDagRunBody(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + actions: Annotated[ + list[ + BulkCreateActionBulkDagRunBody | BulkUpdateActionBulkDagRunBody | BulkDeleteActionBulkDagRunBody + ], + Field(title="Actions"), + ] + + class BulkBodyBulkTaskInstanceBody(BaseModel): model_config = ConfigDict( extra="forbid",