diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py index 397389994a09f..c8e7ab2378dd1 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -168,6 +168,7 @@ class ClearTaskInstancesBody(StrictBaseModel): "clearing the task instances.", ) prevent_running_task: bool = False + note: Annotated[str, StringConstraints(max_length=1000)] | None = None @model_validator(mode="before") @classmethod diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 17c4f87a6e44f..128d0f2bd0536 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -9904,6 +9904,12 @@ components: type: boolean title: Prevent Running Task default: false + note: + anyOf: + - type: string + maxLength: 1000 + - type: 'null' + title: Note additionalProperties: false type: object title: ClearTaskInstancesBody diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 97fa930b1c153..9488c0b7591fb 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -721,6 +721,7 @@ def post_clear_task_instances( dag_bag: DagBagDep, body: ClearTaskInstancesBody, session: SessionDep, + user: GetUserDep, ) -> TaskInstanceCollectionResponse: """Clear task instances.""" dag = get_latest_version_of_dag(dag_bag, dag_id, session) @@ -837,6 +838,13 @@ def _collect_relatives(run_id: str, direction: Literal["upstream", "downstream"] except AirflowClearRunningTaskException as e: raise HTTPException(status.HTTP_409_CONFLICT, str(e)) from e + if body.note is not None: + _patch_task_instance_note( + task_instance_body=body, + tis=task_instances, + user=user, + ) + return TaskInstanceCollectionResponse( task_instances=[TaskInstanceResponse.model_validate(ti) for ti in task_instances], total_entries=len(task_instances), diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py index 1c3d7c62913d3..8ab426bbef688 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py @@ -37,7 +37,11 @@ BulkDeleteAction, BulkUpdateAction, ) -from airflow.api_fastapi.core_api.datamodels.task_instances import BulkTaskInstanceBody, PatchTaskInstanceBody +from airflow.api_fastapi.core_api.datamodels.task_instances import ( + BulkTaskInstanceBody, + ClearTaskInstancesBody, + PatchTaskInstanceBody, +) from airflow.api_fastapi.core_api.security import GetUserDep from airflow.api_fastapi.core_api.services.public.common import BulkService from airflow.listeners.listener import get_listener_manager @@ -139,7 +143,7 @@ def _patch_task_instance_state( def _patch_task_instance_note( - task_instance_body: BulkTaskInstanceBody | PatchTaskInstanceBody, + task_instance_body: BulkTaskInstanceBody | ClearTaskInstancesBody | PatchTaskInstanceBody, tis: list[TI], user: GetUserDep, update_mask: list[str] | None = Query(None), @@ -275,6 +279,7 @@ def _perform_update( dag_bag=self.dag_bag, body=entity, session=self.session, + map_index=map_index, update_mask=update_mask, ) @@ -318,12 +323,12 @@ def handle_bulk_update( try: specific_entity_map = { - (entity.dag_id, entity.dag_run_id, entity.task_id, entity.map_index): entity + self._extract_task_identifiers(entity): entity for entity in action.entities if entity.map_index is not None } all_map_entity_map = { - (entity.dag_id, entity.dag_run_id, entity.task_id): entity + self._extract_task_identifiers(entity)[:3]: entity for entity in action.entities if entity.map_index is None } diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index f47aa107aea24..0195c0b564aef 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1383,6 +1383,18 @@ export const $ClearTaskInstancesBody = { type: 'boolean', title: 'Prevent Running Task', default: false + }, + note: { + anyOf: [ + { + type: 'string', + maxLength: 1000 + }, + { + type: 'null' + } + ], + title: 'Note' } }, additionalProperties: false, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 7423c08d42c05..9c61f27e71e79 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -427,6 +427,7 @@ export type ClearTaskInstancesBody = { */ run_on_latest_version?: boolean; prevent_running_task?: boolean; + note?: string | null; }; /** diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 4176a91506254..8c42e3c5c59b7 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -193,6 +193,7 @@ "users": "Users" }, "selectLanguage": "Select Language", + "selected": "Selected", "showDetailsPanel": "Show Details Panel", "signedInAs": "Signed in as", "source": { @@ -293,6 +294,13 @@ "utc": "UTC (Coordinated Universal Time)" }, "toaster": { + "bulkClear": { + "error": "Bulk Clear {{resourceName}} Request Failed", + "success": { + "description": "{{count}} {{resourceName}} have been successfully cleared. Keys: {{keys}}", + "title": "Bulk Clear {{resourceName}} Request Submitted" + } + }, "bulkDelete": { "error": "Bulk Delete {{resourceName}} Request Failed", "success": { @@ -300,6 +308,13 @@ "title": "Bulk Delete {{resourceName}} Request Submitted" } }, + "bulkUpdate": { + "error": "Bulk Update {{resourceName}} Request Failed", + "success": { + "description": "{{count}} {{resourceName}} have been successfully updated. Keys: {{keys}}", + "title": "Bulk Update {{resourceName}} Request Submitted" + } + }, "create": { "error": "Create {{resourceName}} Request Failed", "success": { diff --git a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx index e2f5b274a4c10..514d93799d52f 100644 --- a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx +++ b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx @@ -20,7 +20,11 @@ import { Box, Editable, Text, VStack } from "@chakra-ui/react"; import type { ChangeEvent } from "react"; import { useTranslation } from "react-i18next"; -import type { DAGRunResponse, TaskInstanceCollectionResponse } from "openapi/requests/types.gen"; +import type { + DAGRunResponse, + TaskInstanceCollectionResponse, + TaskInstanceResponse, +} from "openapi/requests/types.gen"; import ReactMarkdown from "src/components/ReactMarkdown"; import { Accordion } from "src/components/ui"; @@ -29,17 +33,59 @@ import { getColumns } from "./columns"; type Props = { readonly affectedTasks?: TaskInstanceCollectionResponse; + readonly groupByRunId?: boolean; readonly note: DAGRunResponse["note"]; readonly setNote: (value: string) => void; }; +const TasksTable = ({ + noRowsMessage, + tasks, +}: { + readonly noRowsMessage: string; + readonly tasks: Array; +}) => { + const { t: translate } = useTranslation(); + const columns = getColumns(translate); + + return ( + + ); +}; + // Table is in memory, pagination and sorting are disabled. // TODO: Make a front-end only unconnected table component with client side ordering and pagination -const ActionAccordion = ({ affectedTasks, note, setNote }: Props) => { +const ActionAccordion = ({ affectedTasks, groupByRunId = false, note, setNote }: Props) => { const showTaskSection = affectedTasks !== undefined; const { t: translate } = useTranslation(); - const columns = getColumns(translate); + // Group task instances by dag_run_id when requested + const runGroups = (() => { + if (!groupByRunId || !affectedTasks) { + return undefined; + } + + const map = new Map>(); + + for (const ti of affectedTasks.task_instances) { + const group = map.get(ti.dag_run_id) ?? []; + + group.push(ti); + map.set(ti.dag_run_id, group); + } + + return map; + })(); + + // Only group when there are actually multiple run IDs + const shouldGroup = groupByRunId && runGroups !== undefined && runGroups.size > 1; return ( { - + {shouldGroup ? ( + + {[...runGroups.entries()].map(([runId, tis]) => ( + + + + {translate("runId")}: {runId}{" "} + + ({tis.length}) + + + + + + + + ))} + + ) : ( + + )} diff --git a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx index 63e1df78526c4..5dc09c1f42141 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx @@ -57,7 +57,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr const downstream = selectedOptions.includes("downstream"); const [runOnLatestVersion, setRunOnLatestVersion] = useState(false); - const [note, setNote] = useState(""); + const [note, setNote] = useState(null); const { data: dagDetails } = useDagServiceGetDagDetails({ dagId, @@ -186,6 +186,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr include_future: future, include_past: past, include_upstream: upstream, + ...(note === null ? {} : { note }), only_failed: onlyFailed, run_on_latest_version: runOnLatestVersion, task_ids: groupTaskIds, diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx new file mode 100644 index 0000000000000..13bc6df292bae --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx @@ -0,0 +1,157 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Button, Flex, Heading, VStack, useDisclosure } from "@chakra-ui/react"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; +import { CgRedo } from "react-icons/cg"; + +import type { TaskInstanceResponse } from "openapi/requests/types.gen"; +import { ActionAccordion } from "src/components/ActionAccordion"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { Checkbox, Dialog } from "src/components/ui"; +import SegmentedControl from "src/components/ui/SegmentedControl"; +import { useBulkClearDryRun } from "src/queries/useBulkClearDryRun"; +import { useBulkClearTaskInstances } from "src/queries/useBulkClearTaskInstances"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedTaskInstances: Array; +}; + +const BulkClearTaskInstancesButton = ({ clearSelections, selectedTaskInstances }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const [selectedOptions, setSelectedOptions] = useState>(["downstream"]); + const [note, setNote] = useState(null); + const [preventRunningTask, setPreventRunningTask] = useState(true); + const { bulkClear, error, isPending } = useBulkClearTaskInstances({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const handleClose = () => { + setNote(null); + onClose(); + }; + + const past = selectedOptions.includes("past"); + const future = selectedOptions.includes("future"); + const upstream = selectedOptions.includes("upstream"); + const downstream = selectedOptions.includes("downstream"); + const onlyFailed = selectedOptions.includes("onlyFailed"); + + const hasLogicalDate = selectedTaskInstances.some((ti) => ti.logical_date !== null); + + const { data: affectedTasks, isFetching } = useBulkClearDryRun(open, selectedTaskInstances, { + includeDownstream: downstream, + includeFuture: future, + includeOnlyFailed: onlyFailed, + includePast: past, + includeUpstream: upstream, + }); + + return ( + <> + + + + + + + + {translate("dags:runAndTaskActions.clear.title", { + type: translate("taskInstance_other"), + })} + + + + + + + + + + + + + setPreventRunningTask(Boolean(event.checked))} + > + {translate("dags:runAndTaskActions.options.preventRunningTasks")} + + + + + + + + ); +}; + +export default BulkClearTaskInstancesButton; diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx new file mode 100644 index 0000000000000..c3395d3be034f --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx @@ -0,0 +1,153 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Button, Flex, Heading, Text, useDisclosure, VStack } from "@chakra-ui/react"; +import { useTranslation } from "react-i18next"; +import { FiTrash2 } from "react-icons/fi"; + +import type { TaskInstanceResponse } from "openapi/requests/types.gen"; +import { getColumns } from "src/components/ActionAccordion/columns"; +import { DataTable } from "src/components/DataTable"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { Accordion, Dialog } from "src/components/ui"; +import { useBulkTaskInstances } from "src/queries/useBulkTaskInstances"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedTaskInstances: Array; +}; + +const BulkDeleteTaskInstancesButton = ({ clearSelections, selectedTaskInstances }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const { bulkAction, error, isPending } = useBulkTaskInstances({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const columns = getColumns(translate); + + // Group by dag_run_id for display + const byRunId = new Map>(); + + for (const ti of selectedTaskInstances) { + const group = byRunId.get(ti.dag_run_id) ?? []; + + group.push(ti); + byRunId.set(ti.dag_run_id, group); + } + + const isGrouped = byRunId.size > 1; + + return ( + <> + + + + + + + + {translate("dags:runAndTaskActions.delete.dialog.title", { + type: translate("taskInstance_other"), + })} + + + + + + + + {translate("dags:runAndTaskActions.delete.dialog.warning", { + type: translate("taskInstance_other"), + })} + + + + {isGrouped ? ( + + {[...byRunId.entries()].map(([runId, tis]) => ( + + + + {translate("runId")}: {runId}{" "} + + ({tis.length}) + + + + + + + + ))} + + ) : ( + + )} + + + + + + + + + + + ); +}; + +export default BulkDeleteTaskInstancesButton; diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx new file mode 100644 index 0000000000000..e30b777ad9c93 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx @@ -0,0 +1,208 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Badge, Box, Button, Flex, Heading, HStack, VStack, useDisclosure } from "@chakra-ui/react"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; +import { FiX } from "react-icons/fi"; +import { LuCheck } from "react-icons/lu"; + +import type { TaskInstanceResponse, TaskInstanceState } from "openapi/requests/types.gen"; +import { ActionAccordion } from "src/components/ActionAccordion"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { allowedStates } from "src/components/MarkAs/utils"; +import { StateBadge } from "src/components/StateBadge"; +import { Dialog, Menu } from "src/components/ui"; +import SegmentedControl from "src/components/ui/SegmentedControl"; +import { useBulkMarkAsDryRun } from "src/queries/useBulkMarkAsDryRun"; +import { useBulkTaskInstances } from "src/queries/useBulkTaskInstances"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedTaskInstances: Array; +}; + +const BulkMarkTaskInstancesAsButton = ({ clearSelections, selectedTaskInstances }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const [state, setState] = useState("success"); + const [selectedOptions, setSelectedOptions] = useState>([]); + const [note, setNote] = useState(null); + const { bulkAction, error, isPending, setError } = useBulkTaskInstances({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const past = selectedOptions.includes("past"); + const future = selectedOptions.includes("future"); + const upstream = selectedOptions.includes("upstream"); + const downstream = selectedOptions.includes("downstream"); + + const hasLogicalDate = selectedTaskInstances.some((ti) => ti.logical_date !== null); + + const affectedCount = (targetState: TaskInstanceState) => + selectedTaskInstances.filter((ti) => ti.state !== targetState).length; + + const { data: affectedTasks, isFetching } = useBulkMarkAsDryRun(open, { + options: { + includeDownstream: downstream, + includeFuture: future, + includePast: past, + includeUpstream: upstream, + }, + selectedTaskInstances, + targetState: state, + }); + + const handleOpen = (newState: TaskInstanceState) => { + setState(newState); + setSelectedOptions([]); + setNote(null); + setError(undefined); + onOpen(); + }; + + const directlyAffected = selectedTaskInstances.filter((ti) => ti.state !== state); + + return ( + + + +
+ +
+
+ + {allowedStates.map((menuState) => { + const count = affectedCount(menuState); + + return ( + { + if (count > 0) { + handleOpen(menuState); + } + }} + value={menuState} + > + + {translate(`common:states.${menuState}`)} + + {count} + + + + ); + })} + +
+ + + + + + + {translate("dags:runAndTaskActions.markAs.title", { + state, + type: translate("taskInstance_other"), + })}{" "} + + + + + + + + + + + + + + + + + + +
+ ); +}; + +export default BulkMarkTaskInstancesAsButton; diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx index c63cffd54c193..273a19ef98399 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx @@ -29,21 +29,29 @@ import type { TaskInstanceResponse } from "openapi/requests/types.gen"; import { ClearTaskInstanceButton } from "src/components/Clear"; import { DagVersion } from "src/components/DagVersion"; import { DataTable } from "src/components/DataTable"; +import { useRowSelection, type GetColumnsParams } from "src/components/DataTable/useRowSelection"; import { useTableURLState } from "src/components/DataTable/useTableUrlState"; import { ErrorAlert } from "src/components/ErrorAlert"; import { MarkTaskInstanceAsButton } from "src/components/MarkAs"; import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; import { TruncatedText } from "src/components/TruncatedText"; +import { ActionBar } from "src/components/ui/ActionBar"; +import { Checkbox } from "src/components/ui/Checkbox"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { useAutoRefresh, isStatePending, renderDuration } from "src/utils"; import { getTaskInstanceLink } from "src/utils/links"; +import BulkClearTaskInstancesButton from "./BulkClearTaskInstancesButton"; +import BulkDeleteTaskInstancesButton from "./BulkDeleteTaskInstancesButton"; +import BulkMarkTaskInstancesAsButton from "./BulkMarkTaskInstancesAsButton"; import DeleteTaskInstanceButton from "./DeleteTaskInstanceButton"; import { TaskInstancesFilter } from "./TaskInstancesFilter"; type TaskInstanceRow = { row: { original: TaskInstanceResponse } }; +const getRowKey = (ti: TaskInstanceResponse) => `${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`; + const { DAG_ID_PATTERN: DAG_ID_PATTERN_PARAM, DAG_VERSION: DAG_VERSION_PARAM, @@ -63,17 +71,47 @@ const { TRY_NUMBER: TRY_NUMBER_PARAM, }: SearchParamsKeysType = SearchParamsKeys; +type ColumnProps = { + readonly dagId?: string; + readonly runId?: string; + readonly taskId?: string; + readonly translate: TFunction; +}; + const taskInstanceColumns = ({ + allRowsSelected, dagId, + onRowSelect, + onSelectAll, runId, + selectedRows, taskId, translate, -}: { - dagId?: string; - runId?: string; - taskId?: string; - translate: TFunction; -}): Array> => [ +}: ColumnProps & GetColumnsParams): Array> => [ + { + accessorKey: "select", + cell: ({ row }) => ( + onRowSelect(getRowKey(row.original), Boolean(event.checked))} + /> + ), + enableHiding: false, + enableSorting: false, + header: () => ( + onSelectAll(Boolean(event.checked))} + /> + ), + meta: { + skeletonWidth: 10, + }, + }, ...(Boolean(dagId) ? [] : [ @@ -95,7 +133,6 @@ const taskInstanceColumns = ({ : [ { accessorKey: "run_after", - // If we don't show the taskId column, make the dag run a link to the task instance cell: ({ row: { original } }: TaskInstanceRow) => Boolean(taskId) ? ( @@ -292,9 +329,22 @@ export const TaskInstances = () => { }, ); + const { allRowsSelected, clearSelections, handleRowSelect, handleSelectAll, selectedRows } = + useRowSelection({ + data: data?.task_instances, + getKey: getRowKey, + }); + + const selectedTaskInstances = (data?.task_instances ?? []).filter((ti) => selectedRows.has(getRowKey(ti))); + const columns = taskInstanceColumns({ + allRowsSelected, dagId, + multiTeam: false, + onRowSelect: handleRowSelect, + onSelectAll: handleSelectAll, runId, + selectedRows, taskId: Boolean(groupId) ? undefined : taskId, translate, }); @@ -312,6 +362,27 @@ export const TaskInstances = () => { onStateChange={setTableURLState} total={data?.total_entries} /> + + + + {selectedRows.size} {translate("selected")} + + + + + + + + ); }; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts b/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts new file mode 100644 index 0000000000000..e631860f300c7 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts @@ -0,0 +1,113 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueries } from "@tanstack/react-query"; +import { useMemo } from "react"; + +import { TaskInstanceService } from "openapi/requests/services.gen"; +import type { TaskInstanceCollectionResponse, TaskInstanceResponse } from "openapi/requests/types.gen"; + +type Options = { + includeDownstream: boolean; + includeFuture: boolean; + includeOnlyFailed: boolean; + includePast: boolean; + includeUpstream: boolean; +}; + +const EMPTY: TaskInstanceCollectionResponse = { task_instances: [], total_entries: 0 }; + +export const useBulkClearDryRunKey = "bulkClearDryRun"; + +export const useBulkClearDryRun = ( + enabled: boolean, + selectedTaskInstances: Array, + options: Options, +) => { + const byDagRun = useMemo(() => { + const groups = new Map }>(); + + for (const ti of selectedTaskInstances) { + const key = `${ti.dag_id}::${ti.dag_run_id}`; + const group = groups.get(key) ?? { dagId: ti.dag_id, dagRunId: ti.dag_run_id, tis: [] }; + + group.tis.push(ti); + groups.set(key, group); + } + + return [...groups.values()]; + }, [selectedTaskInstances]); + + const results = useQueries({ + queries: byDagRun.map(({ dagId, dagRunId, tis }) => ({ + enabled, + queryFn: () => + TaskInstanceService.postClearTaskInstances({ + dagId, + requestBody: { + dag_run_id: dagRunId, + dry_run: true, + include_downstream: options.includeDownstream, + include_future: options.includeFuture, + include_past: options.includePast, + include_upstream: options.includeUpstream, + only_failed: options.includeOnlyFailed, + task_ids: tis.map((ti) => + ti.map_index >= 0 ? ([ti.task_id, ti.map_index] as [string, number]) : ti.task_id, + ), + }, + }), + queryKey: [ + useBulkClearDryRunKey, + dagId, + dagRunId, + { + include_downstream: options.includeDownstream, + include_future: options.includeFuture, + include_only_failed: options.includeOnlyFailed, + include_past: options.includePast, + include_upstream: options.includeUpstream, + task_ids: tis.map((ti) => `${ti.task_id}:${ti.map_index}`), + }, + ], + refetchOnMount: "always" as const, + })), + }); + + const isFetching = results.some((result) => result.isFetching); + + const data = useMemo(() => { + const seen = new Set(); + const merged: Array = []; + + for (const result of results) { + for (const ti of result.data?.task_instances ?? []) { + const key = `${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`; + + if (!seen.has(key)) { + seen.add(key); + merged.push(ti); + } + } + } + + return merged.length === 0 ? EMPTY : { task_instances: merged, total_entries: merged.length }; + }, [results]); + + return { data, isFetching }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts new file mode 100644 index 0000000000000..063b1c6d77d86 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts @@ -0,0 +1,118 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { useDagRunServiceGetDagRunsKey, useTaskInstanceServiceGetTaskInstancesKey } from "openapi/queries"; +import { TaskInstanceService } from "openapi/requests/services.gen"; +import type { TaskInstanceResponse } from "openapi/requests/types.gen"; +import { toaster } from "src/components/ui"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly onSuccessConfirm: VoidFunction; +}; + +export type BulkClearOptions = { + includeDownstream: boolean; + includeFuture: boolean; + includeOnlyFailed: boolean; + includePast: boolean; + includeUpstream: boolean; + note: string | null; + preventRunningTask: boolean; +}; + +export const useBulkClearTaskInstances = ({ clearSelections, onSuccessConfirm }: Props) => { + const queryClient = useQueryClient(); + const [error, setError] = useState(undefined); + const [isPending, setIsPending] = useState(false); + const { t: translate } = useTranslation(["common", "dags"]); + + const invalidateQueries = async () => { + await Promise.all([ + queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }), + queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }), + ]); + }; + + const bulkClear = async (taskInstances: Array, options: BulkClearOptions) => { + setError(undefined); + setIsPending(true); + + // Group by (dag_id, dag_run_id) — clear endpoint requires a specific dag_id + // and dag_run_id scopes the clear to the specific run + const byDagRun = new Map }>(); + + for (const ti of taskInstances) { + const key = `${ti.dag_id}::${ti.dag_run_id}`; + const group = byDagRun.get(key) ?? { dagId: ti.dag_id, dagRunId: ti.dag_run_id, tis: [] }; + + group.tis.push(ti); + byDagRun.set(key, group); + } + + try { + await Promise.all( + [...byDagRun.values()].map(({ dagId, dagRunId, tis }) => + TaskInstanceService.postClearTaskInstances({ + dagId, + requestBody: { + dag_run_id: dagRunId, + dry_run: false, + include_downstream: options.includeDownstream, + include_future: options.includeFuture, + include_past: options.includePast, + include_upstream: options.includeUpstream, + note: options.note, + only_failed: options.includeOnlyFailed, + ...(options.preventRunningTask ? { prevent_running_task: true } : {}), + task_ids: tis.map((ti) => + ti.map_index >= 0 ? ([ti.task_id, ti.map_index] as [string, number]) : ti.task_id, + ), + }, + }), + ), + ); + + await invalidateQueries(); + + toaster.create({ + description: translate("toaster.bulkClear.success.description", { + count: taskInstances.length, + keys: taskInstances.map((ti) => ti.task_id).join(", "), + resourceName: translate("taskInstance_other"), + }), + title: translate("toaster.bulkClear.success.title", { + resourceName: translate("taskInstance_other"), + }), + type: "success", + }); + + clearSelections(); + onSuccessConfirm(); + } catch (_error) { + setError(_error); + } + setIsPending(false); + }; + + return { bulkClear, error, isPending, setError }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts new file mode 100644 index 0000000000000..b8d5eb0675852 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts @@ -0,0 +1,113 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueries } from "@tanstack/react-query"; +import { useMemo } from "react"; + +import { TaskInstanceService } from "openapi/requests/services.gen"; +import type { + TaskInstanceCollectionResponse, + TaskInstanceResponse, + TaskInstanceState, +} from "openapi/requests/types.gen"; + +type Options = { + includeDownstream: boolean; + includeFuture: boolean; + includePast: boolean; + includeUpstream: boolean; +}; + +const EMPTY: TaskInstanceCollectionResponse = { task_instances: [], total_entries: 0 }; + +export const useBulkMarkAsDryRunKey = "bulkMarkAsDryRun"; + +export const useBulkMarkAsDryRun = ( + enabled: boolean, + { + options, + selectedTaskInstances, + targetState, + }: { + options: Options; + selectedTaskInstances: Array; + targetState: TaskInstanceState; + }, +) => { + const affectedInstances = useMemo( + () => selectedTaskInstances.filter((ti) => ti.state !== targetState), + [selectedTaskInstances, targetState], + ); + + const results = useQueries({ + queries: affectedInstances.map((ti) => ({ + enabled, + queryFn: () => + TaskInstanceService.patchTaskInstanceDryRun({ + dagId: ti.dag_id, + dagRunId: ti.dag_run_id, + mapIndex: ti.map_index, + requestBody: { + include_downstream: options.includeDownstream, + include_future: options.includeFuture, + include_past: options.includePast, + include_upstream: options.includeUpstream, + new_state: targetState, + }, + taskId: ti.task_id, + }), + queryKey: [ + useBulkMarkAsDryRunKey, + ti.dag_id, + ti.dag_run_id, + ti.task_id, + ti.map_index, + { + include_downstream: options.includeDownstream, + include_future: options.includeFuture, + include_past: options.includePast, + include_upstream: options.includeUpstream, + new_state: targetState, + }, + ], + refetchOnMount: "always" as const, + })), + }); + + const isFetching = results.some((result) => result.isFetching); + + const data = useMemo(() => { + const seen = new Set(); + const merged: Array = []; + + for (const result of results) { + for (const ti of result.data?.task_instances ?? []) { + const key = `${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`; + + if (!seen.has(key)) { + seen.add(key); + merged.push(ti); + } + } + } + + return merged.length === 0 ? EMPTY : { task_instances: merged, total_entries: merged.length }; + }, [results]); + + return { data, isFetching }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts new file mode 100644 index 0000000000000..8ad4d616197f5 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts @@ -0,0 +1,105 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { + useDagRunServiceGetDagRunsKey, + useTaskInstanceServiceBulkTaskInstances, + useTaskInstanceServiceGetTaskInstancesKey, +} from "openapi/queries"; +import type { + BulkActionResponse, + BulkBody_BulkTaskInstanceBody_, + BulkResponse, +} from "openapi/requests/types.gen"; +import { toaster } from "src/components/ui"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly onSuccessConfirm: VoidFunction; +}; + +const handleActionResult = ( + actionResult: BulkActionResponse, + setError: (error: unknown) => void, + onSuccess: (count: number, keys: Array) => void, +) => { + const { errors, success } = actionResult; + + if (Array.isArray(errors) && errors.length > 0) { + const apiError = errors[0] as { error: string }; + + setError({ body: { detail: apiError.error } }); + } else if (Array.isArray(success) && success.length > 0) { + onSuccess(success.length, success); + } +}; + +export const useBulkTaskInstances = ({ clearSelections, onSuccessConfirm }: Props) => { + const queryClient = useQueryClient(); + const [error, setError] = useState(undefined); + const { t: translate } = useTranslation(["common", "dags"]); + + const onSuccess = async (responseData: BulkResponse) => { + await Promise.all([ + queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }), + queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }), + ]); + + const isDelete = Boolean(responseData.delete); + const actionResult = responseData.delete ?? responseData.update; + const toasterKey = isDelete ? "toaster.bulkDelete" : "toaster.bulkUpdate"; + + if (actionResult) { + handleActionResult(actionResult, setError, (count, keys) => { + toaster.create({ + description: translate(`${toasterKey}.success.description`, { + count, + keys: keys.join(", "), + resourceName: translate("taskInstance_other"), + }), + title: translate(`${toasterKey}.success.title`, { + resourceName: translate("taskInstance_other"), + }), + type: "success", + }); + clearSelections(); + onSuccessConfirm(); + }); + } + }; + + const onError = (_error: unknown) => { + setError(_error); + }; + + const { isPending, mutate } = useTaskInstanceServiceBulkTaskInstances({ + onError, + onSuccess, + }); + + const bulkAction = (requestBody: BulkBody_BulkTaskInstanceBody_) => { + setError(undefined); + mutate({ dagId: "~", dagRunId: "~", requestBody }); + }; + + return { bulkAction, error, isPending, setError }; +}; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index ba8f06a742459..1a5d689d1ef6b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -3644,6 +3644,88 @@ def test_dry_run_audit_log(self, test_client, session, dry_run, audit_log_count) assert response.status_code == 200 assert logs == audit_log_count + @pytest.mark.db_test + def test_clear_sets_note_on_task_instances(self, test_client, session): + """Test that a note is set on cleared task instances when note is provided.""" + dag_id = "example_python_operator" + note_value = "Cleared by automation" + payload = { + "dry_run": False, + "reset_dag_runs": False, + "only_failed": True, + "only_running": False, + "note": note_value, + } + self.create_task_instances( + session, + dag_id=dag_id, + task_instances=[{"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}], + update_extras=False, + ) + response = test_client.post( + f"/dags/{dag_id}/clearTaskInstances", + json=payload, + ) + assert response.status_code == 200 + response_data = response.json() + assert response_data["total_entries"] == 1 + ti_id = response_data["task_instances"][0]["id"] + _check_task_instance_note(session, ti_id, {"content": note_value, "user_id": "test"}) + + @pytest.mark.db_test + def test_clear_without_note_does_not_set_note(self, test_client, session): + """Test that existing note is preserved on cleared task instances when note is not provided.""" + dag_id = "example_python_operator" + payload = { + "dry_run": False, + "reset_dag_runs": False, + "only_failed": True, + "only_running": False, + } + self.create_task_instances( + session, + dag_id=dag_id, + task_instances=[{"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}], + update_extras=False, + ) + response = test_client.post( + f"/dags/{dag_id}/clearTaskInstances", + json=payload, + ) + assert response.status_code == 200 + response_data = response.json() + assert response_data["total_entries"] == 1 + ti_id = response_data["task_instances"][0]["id"] + _check_task_instance_note(session, ti_id, {"content": "placeholder-note", "user_id": None}) + + @pytest.mark.db_test + def test_clear_dry_run_does_not_set_note(self, test_client, session): + """Test that a note is NOT updated when dry_run=True even if note is provided.""" + dag_id = "example_python_operator" + note_value = "Should not be set" + payload = { + "dry_run": True, + "reset_dag_runs": False, + "only_failed": True, + "only_running": False, + "note": note_value, + } + self.create_task_instances( + session, + dag_id=dag_id, + task_instances=[{"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}], + update_extras=False, + ) + response = test_client.post( + f"/dags/{dag_id}/clearTaskInstances", + json=payload, + ) + assert response.status_code == 200 + response_data = response.json() + assert response_data["total_entries"] == 1 + ti_id = response_data["task_instances"][0]["id"] + _check_task_instance_note(session, ti_id, {"content": "placeholder-note", "user_id": None}) + class TestGetTaskInstanceTries(TestTaskInstanceEndpoint): def test_should_respond_200(self, test_client, session): @@ -6001,6 +6083,64 @@ def test_bulk_task_instances( for task_id, value in expected_results.items(): assert sorted(response_data[task_id]) == sorted(value) + @pytest.mark.parametrize( + ("map_index", "new_state"), + [ + pytest.param(0, "failed", id="mapped-ti-map-index-0-failed"), + pytest.param(1, "failed", id="mapped-ti-map-index-1-failed"), + pytest.param(2, "success", id="mapped-ti-map-index-2-success"), + ], + ) + def test_bulk_update_mapped_task_instance_state_is_persisted( + self, test_client, session, map_index, new_state + ): + """Verify that bulk-updating a specific mapped TI actually persists the new state in the DB.""" + self.create_task_instances( + session, + task_instances=[{"state": State.RUNNING, "map_indexes": (0, 1, 2)}], + ) + + response = test_client.patch( + self.ENDPOINT_URL, + json={ + "actions": [ + { + "action": "update", + "entities": [ + { + "task_id": self.TASK_ID, + "map_index": map_index, + "new_state": new_state, + } + ], + } + ] + }, + ) + assert response.status_code == 200 + assert response.json()["update"]["success"] == [ + f"{self.DAG_ID}.{self.RUN_ID}.{self.TASK_ID}[{map_index}]" + ] + + session.expire_all() + # Verify only the targeted mapped TI changed state; others remain unchanged. + for mi in [0, 1, 2]: + ti = session.scalar( + select(TaskInstance).where( + TaskInstance.dag_id == self.DAG_ID, + TaskInstance.run_id == self.RUN_ID, + TaskInstance.task_id == self.TASK_ID, + TaskInstance.map_index == mi, + ) + ) + assert ti is not None + if mi == map_index: + assert ti.state == new_state, f"Expected map_index={mi} to be {new_state!r}, got {ti.state!r}" + else: + assert ti.state == State.RUNNING, ( + f"Expected map_index={mi} to remain running, got {ti.state!r}" + ) + def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.patch(self.ENDPOINT_URL, json={}) assert response.status_code == 401 diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index bb375f888349c..1fe3871370357 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -173,6 +173,7 @@ class ClearTaskInstancesBody(BaseModel): ), ] = False prevent_running_task: Annotated[bool | None, Field(title="Prevent Running Task")] = False + note: Annotated[Note | None, Field(title="Note")] = None class Value(RootModel[list]):