From 7c49615a89dcac0ca4fdeeb81948ae3dce137baa Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Tue, 19 Aug 2025 22:12:13 +0800 Subject: [PATCH 1/8] Add bulk TI deletion UI --- .../ui/public/i18n/locales/en/common.json | 8 + .../ui/public/i18n/locales/zh-TW/common.json | 7 + .../ActionAccordion/ActionAccordion.tsx | 72 ++++----- .../DeleteTaskInstancesButton.tsx | 140 ++++++++++++++++++ .../src/pages/TaskInstances/TaskInstances.tsx | 113 +++++++++++++- .../src/queries/useBulkDeleteTaskInstances.ts | 87 +++++++++++ 6 files changed, 387 insertions(+), 40 deletions(-) create mode 100644 airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx create mode 100644 airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 9fd0cc4efe940..55ba1704c1921 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -25,6 +25,12 @@ "requiredActions": "Required Actions", "xcoms": "XComs" }, + "bulkAction": { + "success": { + "description": "{{count}} {{type}} {{action}} successfully. Keys: {{keys}}", + "title": "{{action}} {{type}} Request Successful" + } + }, "collapseAllExtra": "Collapse all extra json", "collapseDetailsPanel": "Collapse Details Panel", "createdAssetEvent_one": "Created Asset Event", @@ -74,6 +80,7 @@ "dagWarnings": "Dag warnings/errors", "defaultToGraphView": "Default to graph view", "defaultToGridView": "Default to grid view", + "delete": "Delete", "direction": "Direction", "docs": { "documentation": "Documentation", @@ -183,6 +190,7 @@ "users": "Users" }, "selectLanguage": "Select Language", + "selected": "Selected", "showDetailsPanel": "Show Details Panel", "signedInAs": "Signed in as", "source": { diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json index d54d3b6ba5a88..543a2e7ed4d87 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json @@ -25,6 +25,12 @@ "requiredActions": "待回應的任務實例", "xcoms": "XComs" }, + "bulkAction": { + "success": { + "description": "{{count}} {{type}}{{action}}成功。鍵:{{keys}}", + "title": "{{action}}{{type}}請求成功" + } + }, "collapseDetailsPanel": "收起詳細資訊", "createdAssetEvent_one": "已建立資源事件", "createdAssetEvent_other": "已建立資源事件", @@ -73,6 +79,7 @@ "dagWarnings": "Dag 警告 / 錯誤", "defaultToGraphView": "預設使用圖形視圖", "defaultToGridView": "預設使用網格視圖", + "delete": "刪除", "direction": "書寫方向", "docs": { "documentation": "文件", diff --git a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx index a08b9744b51b0..a84f6771b6b0c 100644 --- a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx +++ b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx @@ -30,8 +30,8 @@ import { getColumns } from "./columns"; type Props = { readonly affectedTasks?: TaskInstanceCollectionResponse; - readonly note: DAGRunResponse["note"]; - readonly setNote: (value: string) => void; + readonly note?: DAGRunResponse["note"]; + readonly setNote?: (value: string) => void; }; // Table is in memory, pagination and sorting are disabled. @@ -72,40 +72,42 @@ const ActionAccordion = ({ affectedTasks, note, setNote }: Props) => { ) : undefined} - - - {translate("note.label")} - - - ) => setNote(event.target.value)} - value={note ?? ""} - > - + + {translate("note.label")} + + + ) => setNote(event.target.value)} + value={note ?? ""} > - {Boolean(note) ? ( - {note} - ) : ( - {translate("note.placeholder")} - )} - - - - - + + {Boolean(note) ? ( + {note} + ) : ( + {translate("note.placeholder")} + )} + + + + + + ) : undefined} ); }; diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx new file mode 100644 index 0000000000000..679b9c10d947a --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx @@ -0,0 +1,140 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useDisclosure } from "@chakra-ui/react"; +import { Button, Flex, Heading, Text } from "@chakra-ui/react"; +import type { TaskInstanceCollectionResponse, TaskInstanceResponse } from "openapi-gen/requests/types.gen"; +import { useTranslation } from "react-i18next"; +import { FiTrash2 } from "react-icons/fi"; + +import { ActionAccordion } from "src/components/ActionAccordion"; +import ActionButton from "src/components/ui/ActionButton"; +import { Dialog } from "src/components/ui/Dialog"; +import { useBulkDeleteTaskInstances } from "src/queries/useBulkDeleteTaskInstances"; + +type Props = { + readonly clearSelections: () => void; + readonly dagId: string; + readonly dagRunId: string; + readonly deleteKeys: Array; +}; + +const DeleteTaskInstancesButton = ({ clearSelections, dagId, dagRunId, deleteKeys }: Props) => { + const { onClose, onOpen, open } = useDisclosure(); + const { isPending, mutate } = useBulkDeleteTaskInstances({ + dagId, + dagRunId, + onSuccessConfirm: () => { + clearSelections(); + onClose(); + }, + }); + const { t: translate } = useTranslation(); + + if (deleteKeys.length === 0) { + return undefined; + } + + const type = translate("common:taskInstance_other"); + const title = translate("dags:runAndTaskActions.delete.dialog.title", { type }); + const warningText = translate("dags:runAndTaskActions.delete.dialog.warning", { type }); + const deleteButtonText = translate("dags:runAndTaskActions.delete.button", { type }); + + const handleDelete = () => { + if (dagId && dagRunId) { + mutate({ + dagId, + dagRunId, + requestBody: { + actions: [ + { + action: "delete", + entities: deleteKeys.map((ti) => ({ map_index: ti.map_index, task_id: ti.task_id })), + }, + ], + }, + }); + } else { + // cross dag run + const groupedByDagRunTIs: Record> = {}; + + deleteKeys.forEach((ti) => { + (groupedByDagRunTIs[ti.dag_run_id] ??= []).push(ti); + }); + + Object.entries(groupedByDagRunTIs).forEach(([groupDagRunId, groupTIs]) => { + if (dagId && groupDagRunId) { + mutate({ + dagId, + dagRunId: groupDagRunId, + requestBody: { + actions: [ + { + action: "delete", + entities: groupTIs.map((ti) => ({ map_index: ti.map_index, task_id: ti.task_id })), + }, + ], + }, + }); + } + }); + } + }; + + const affectedTasks = { + task_instances: deleteKeys, + total_entries: deleteKeys.length, + } as TaskInstanceCollectionResponse; + + return ( + <> + } + onClick={onOpen} + text={deleteButtonText} + variant="outline" + withText + /> + + + + + {title} + + {warningText} + + + + + + + + + + + ); +}; + +export default DeleteTaskInstancesButton; diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx index f4e0e14aca6c5..227225056cedd 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx @@ -1,3 +1,5 @@ +/* eslint-disable max-lines */ + /*! * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -21,7 +23,7 @@ import { Flex, Link } from "@chakra-ui/react"; import type { ColumnDef } from "@tanstack/react-table"; import type { TFunction } from "i18next"; -import { useMemo } from "react"; +import { useMemo, useMemo } from "react"; import { useTranslation } from "react-i18next"; import { Link as RouterLink, useParams, useSearchParams } from "react-router-dom"; @@ -30,19 +32,26 @@ import type { TaskInstanceResponse } from "openapi/requests/types.gen"; import { ClearTaskInstanceButton } from "src/components/Clear"; import { DagVersion } from "src/components/DagVersion"; import { DataTable } from "src/components/DataTable"; +import { type GetColumnsParams, useRowSelection } from "src/components/DataTable/useRowSelection"; import { useTableURLState } from "src/components/DataTable/useTableUrlState"; import { ErrorAlert } from "src/components/ErrorAlert"; import { MarkTaskInstanceAsButton } from "src/components/MarkAs"; import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; import { TruncatedText } from "src/components/TruncatedText"; +import { ActionBar } from "src/components/ui/ActionBar"; +import { Checkbox } from "src/components/ui/Checkbox"; +import { Tooltip } from "src/components/ui/Tooltip"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { useAutoRefresh, isStatePending, renderDuration } from "src/utils"; import { getTaskInstanceLink } from "src/utils/links"; import DeleteTaskInstanceButton from "./DeleteTaskInstanceButton"; +import DeleteTaskInstancesButton from "./DeleteTaskInstancesButton"; import { TaskInstancesFilter } from "./TaskInstancesFilter"; +const SEPARATOR = "SEPARATOR"; + type TaskInstanceRow = { row: { original: TaskInstanceResponse } }; const { @@ -64,9 +73,13 @@ const { TRY_NUMBER: TRY_NUMBER_PARAM, }: SearchParamsKeysType = SearchParamsKeys; -const taskInstanceColumns = ({ +const getColumns = ({ + allRowsSelected, dagId, + onRowSelect, + onSelectAll, runId, + selectedRows, taskId, translate, }: { @@ -74,7 +87,41 @@ const taskInstanceColumns = ({ runId?: string; taskId?: string; translate: TFunction; -}): Array> => [ +} & GetColumnsParams): Array> => [ + ...(Boolean(dagId) + ? [ + { + accessorKey: "select", + cell: ({ row }: TaskInstanceRow) => ( + + onRowSelect( + `${row.original.dag_run_id}${SEPARATOR}${row.original.task_id}${SEPARATOR}${row.original.map_index}`, + Boolean(event.checked), + ) + } + /> + ), + enableSorting: false, + header: () => ( + onSelectAll(Boolean(event.checked))} + /> + ), + meta: { + skeletonWidth: 10, + }, + }, + ] + : []), ...(Boolean(dagId) ? [] : [ @@ -291,15 +338,36 @@ export const TaskInstances = () => { }, ); + const { allRowsSelected, clearSelections, handleRowSelect, handleSelectAll, selectedRows } = + useRowSelection({ + data: data?.task_instances, + getKey: (taskInstance) => + `${taskInstance.dag_run_id}${SEPARATOR}${taskInstance.task_id}${SEPARATOR}${taskInstance.map_index}`, + }); + const columns = useMemo( () => - taskInstanceColumns({ + getColumns({ + allRowsSelected, dagId, + onRowSelect: handleRowSelect, + onSelectAll: handleSelectAll, runId, + selectedRows, taskId: Boolean(groupId) ? undefined : taskId, translate, }), - [dagId, runId, groupId, taskId, translate], + [ + allRowsSelected, + dagId, + groupId, + handleRowSelect, + handleSelectAll, + runId, + selectedRows, + taskId, + translate, + ], ); return ( @@ -315,6 +383,41 @@ export const TaskInstances = () => { onStateChange={setTableURLState} total={data?.total_entries} /> + + + + {selectedRows.size} {translate("common:selected")} + + + 1 ? "taskInstance_other" : "taskInstance_one"}`, + )}`} + disabled={selectedRows.size === 0} + > + { + const [dagRunId, currentTaskId, mapIndex] = id.split(SEPARATOR); + + return data?.task_instances.find( + (ti) => + ti.dag_run_id === dagRunId && + ti.task_id === currentTaskId && + ti.map_index === Number(mapIndex ?? -1), + ); + }) + .filter(Boolean) as Array + } + /> + + + + ); }; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts new file mode 100644 index 0000000000000..8b32e75b26640 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts @@ -0,0 +1,87 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { + useTaskInstanceServiceGetTaskInstancesKey, + useTaskInstanceServiceBulkTaskInstances, + UseGridServiceGetGridTiSummariesKeyFn, +} from "openapi/queries"; +import { toaster } from "src/components/ui"; + +type Props = { + readonly dagId: string; + readonly dagRunId: string; + readonly onSuccessConfirm: VoidFunction; +}; + +export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm }: Props) => { + const queryClient = useQueryClient(); + const [error, setError] = useState(undefined); + const { t: translate } = useTranslation("common"); + + const onSuccess = async (responseData: { delete?: { errors: Array; success: Array } }) => { + const queryKeys = [ + [useTaskInstanceServiceGetTaskInstancesKey], + UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }), + ]; + + await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key }))); + + if (responseData.delete) { + const { errors, success } = responseData.delete; + + if (Array.isArray(errors) && errors.length > 0) { + const apiError = errors[0] as { error: string }; + + setError({ + body: { detail: apiError.error }, + }); + } else if (Array.isArray(success) && success.length > 0) { + toaster.create({ + description: translate("bulkAction.success.description", { + action: translate("delete"), + count: success.length, + keys: success.join(", "), + type: translate("taskInstance", { count: success.length }), + }), + title: translate("bulkAction.success.title", { + action: translate("delete"), + type: translate("taskInstance", { count: success.length }), + }), + type: "success", + }); + onSuccessConfirm(); + } + } + }; + + const onError = (_error: unknown) => { + setError(_error); + }; + + const { isPending, mutate } = useTaskInstanceServiceBulkTaskInstances({ + onError, + onSuccess, + }); + + return { error, isPending, mutate }; +}; From d8529be7ec887481bba4f8d51683c2ff0cdc81e0 Mon Sep 17 00:00:00 2001 From: "Guan Ming(Wesley) Chiu" <105915352+guan404ming@users.noreply.github.com> Date: Fri, 29 Aug 2025 19:37:11 +0800 Subject: [PATCH 2/8] Separate translation descriptions for each action --- .../src/airflow/ui/public/i18n/locales/en/common.json | 10 +++++++--- .../airflow/ui/public/i18n/locales/zh-TW/common.json | 10 +++++++--- .../ui/src/queries/useBulkDeleteTaskInstances.ts | 9 ++------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 55ba1704c1921..e9480299f94f4 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -26,9 +26,13 @@ "xcoms": "XComs" }, "bulkAction": { - "success": { - "description": "{{count}} {{type}} {{action}} successfully. Keys: {{keys}}", - "title": "{{action}} {{type}} Request Successful" + "delete": { + "taskInstance": { + "success": { + "description": "{{count}} task instances deleted successfully. Keys: {{keys}}", + "title": "Delete Task Instances Request Successful" + } + } } }, "collapseAllExtra": "Collapse all extra json", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json index 543a2e7ed4d87..70065ca125597 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json @@ -26,9 +26,13 @@ "xcoms": "XComs" }, "bulkAction": { - "success": { - "description": "{{count}} {{type}}{{action}}成功。鍵:{{keys}}", - "title": "{{action}}{{type}}請求成功" + "delete": { + "taskInstance": { + "success": { + "description": "已成功刪除 {{count}} 個任務實例。鍵:{{keys}}", + "title": "已提交刪除任務實例請求" + } + } } }, "collapseDetailsPanel": "收起詳細資訊", diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts index 8b32e75b26640..01871859a615e 100644 --- a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts +++ b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts @@ -57,16 +57,11 @@ export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm } }); } else if (Array.isArray(success) && success.length > 0) { toaster.create({ - description: translate("bulkAction.success.description", { - action: translate("delete"), + description: translate("bulkAction.delete.taskInstance.success.description", { count: success.length, keys: success.join(", "), - type: translate("taskInstance", { count: success.length }), - }), - title: translate("bulkAction.success.title", { - action: translate("delete"), - type: translate("taskInstance", { count: success.length }), }), + title: translate("bulkAction.delete.taskInstance.success.title"), type: "success", }); onSuccessConfirm(); From 8e2eff54c1d0d94bbefd55ffb143ac5fc921a2cb Mon Sep 17 00:00:00 2001 From: "Guan Ming(Wesley) Chiu" <105915352+guan404ming@users.noreply.github.com> Date: Fri, 29 Aug 2025 20:18:36 +0800 Subject: [PATCH 3/8] Handle cross dag bulk deletion --- .../DeleteTaskInstancesButton.tsx | 45 +------------ .../src/pages/TaskInstances/TaskInstances.tsx | 64 +++++++++---------- .../src/queries/useBulkDeleteTaskInstances.ts | 56 ++++++++++++++-- 3 files changed, 84 insertions(+), 81 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx index 679b9c10d947a..34692e27a23ad 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx @@ -36,7 +36,7 @@ type Props = { const DeleteTaskInstancesButton = ({ clearSelections, dagId, dagRunId, deleteKeys }: Props) => { const { onClose, onOpen, open } = useDisclosure(); - const { isPending, mutate } = useBulkDeleteTaskInstances({ + const { deleteTaskInstances, isPending } = useBulkDeleteTaskInstances({ dagId, dagRunId, onSuccessConfirm: () => { @@ -55,47 +55,6 @@ const DeleteTaskInstancesButton = ({ clearSelections, dagId, dagRunId, deleteKey const warningText = translate("dags:runAndTaskActions.delete.dialog.warning", { type }); const deleteButtonText = translate("dags:runAndTaskActions.delete.button", { type }); - const handleDelete = () => { - if (dagId && dagRunId) { - mutate({ - dagId, - dagRunId, - requestBody: { - actions: [ - { - action: "delete", - entities: deleteKeys.map((ti) => ({ map_index: ti.map_index, task_id: ti.task_id })), - }, - ], - }, - }); - } else { - // cross dag run - const groupedByDagRunTIs: Record> = {}; - - deleteKeys.forEach((ti) => { - (groupedByDagRunTIs[ti.dag_run_id] ??= []).push(ti); - }); - - Object.entries(groupedByDagRunTIs).forEach(([groupDagRunId, groupTIs]) => { - if (dagId && groupDagRunId) { - mutate({ - dagId, - dagRunId: groupDagRunId, - requestBody: { - actions: [ - { - action: "delete", - entities: groupTIs.map((ti) => ({ map_index: ti.map_index, task_id: ti.task_id })), - }, - ], - }, - }); - } - }); - } - }; - const affectedTasks = { task_instances: deleteKeys, total_entries: deleteKeys.length, @@ -125,7 +84,7 @@ const DeleteTaskInstancesButton = ({ clearSelections, dagId, dagRunId, deleteKey - diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx index 227225056cedd..2579cc83435d4 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx @@ -88,40 +88,36 @@ const getColumns = ({ taskId?: string; translate: TFunction; } & GetColumnsParams): Array> => [ - ...(Boolean(dagId) - ? [ - { - accessorKey: "select", - cell: ({ row }: TaskInstanceRow) => ( - - onRowSelect( - `${row.original.dag_run_id}${SEPARATOR}${row.original.task_id}${SEPARATOR}${row.original.map_index}`, - Boolean(event.checked), - ) - } - /> - ), - enableSorting: false, - header: () => ( - onSelectAll(Boolean(event.checked))} - /> - ), - meta: { - skeletonWidth: 10, - }, - }, - ] - : []), + { + accessorKey: "select", + cell: ({ row }: TaskInstanceRow) => ( + + onRowSelect( + `${row.original.dag_run_id}${SEPARATOR}${row.original.task_id}${SEPARATOR}${row.original.map_index}`, + Boolean(event.checked), + ) + } + /> + ), + enableSorting: false, + header: () => ( + onSelectAll(Boolean(event.checked))} + /> + ), + meta: { + skeletonWidth: 10, + }, + }, ...(Boolean(dagId) ? [] : [ diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts index 01871859a615e..59b620fbe379a 100644 --- a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts +++ b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts @@ -25,14 +25,17 @@ import { useTaskInstanceServiceBulkTaskInstances, UseGridServiceGetGridTiSummariesKeyFn, } from "openapi/queries"; +import type { TaskInstanceResponse } from "openapi/requests"; import { toaster } from "src/components/ui"; type Props = { - readonly dagId: string; - readonly dagRunId: string; + readonly dagId?: string; + readonly dagRunId?: string; readonly onSuccessConfirm: VoidFunction; }; +const SEPARATOR = "SEPARATOR"; + export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm }: Props) => { const queryClient = useQueryClient(); const [error, setError] = useState(undefined); @@ -41,7 +44,9 @@ export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm } const onSuccess = async (responseData: { delete?: { errors: Array; success: Array } }) => { const queryKeys = [ [useTaskInstanceServiceGetTaskInstancesKey], - UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }), + dagId === undefined || dagRunId === undefined + ? [] + : [UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId })], ]; await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key }))); @@ -78,5 +83,48 @@ export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm } onSuccess, }); - return { error, isPending, mutate }; + const deleteTaskInstances = (entities: Array) => { + if (Boolean(dagId) && Boolean(dagRunId) && dagId !== undefined && dagRunId !== undefined) { + mutate({ + dagId, + dagRunId, + requestBody: { + actions: [ + { + action: "delete", + entities: entities.map((ti) => ({ map_index: ti.map_index, task_id: ti.task_id })), + }, + ], + }, + }); + } else { + // cross dag run + const groupedByDagRunTIs: Record> = {}; + + entities.forEach((ti) => { + (groupedByDagRunTIs[`${ti.dag_id}${SEPARATOR}${ti.dag_run_id}`] ??= []).push(ti); + }); + + Object.entries(groupedByDagRunTIs).forEach(([key, groupTIs]) => { + const [groupDagId, groupDagRunId] = key.split(SEPARATOR); + + if (groupDagId !== undefined && groupDagRunId !== undefined) { + mutate({ + dagId: groupDagId, + dagRunId: groupDagRunId, + requestBody: { + actions: [ + { + action: "delete", + entities: groupTIs.map((ti) => ({ map_index: ti.map_index, task_id: ti.task_id })), + }, + ], + }, + }); + } + }); + } + }; + + return { deleteTaskInstances, error, isPending }; }; From 1898b9ccd75c8dc909f4452ffd97c2cd94011066 Mon Sep 17 00:00:00 2001 From: "Guan Ming(Wesley) Chiu" <105915352+guan404ming@users.noreply.github.com> Date: Sun, 31 Aug 2025 18:14:25 +0800 Subject: [PATCH 4/8] Add bulk mark as UI --- .../MarkTaskInstancesAsButton.tsx | 106 +++++++++++ .../MarkTaskInstancesAsDialog.tsx | 109 +++++++++++ .../src/pages/TaskInstances/TaskInstances.tsx | 29 ++- .../src/queries/useBulkPatchTaskInstances.ts | 171 ++++++++++++++++++ 4 files changed, 412 insertions(+), 3 deletions(-) create mode 100644 airflow-core/src/airflow/ui/src/components/MarkAs/TaskInstances/MarkTaskInstancesAsButton.tsx create mode 100644 airflow-core/src/airflow/ui/src/components/MarkAs/TaskInstances/MarkTaskInstancesAsDialog.tsx create mode 100644 airflow-core/src/airflow/ui/src/queries/useBulkPatchTaskInstances.ts diff --git a/airflow-core/src/airflow/ui/src/components/MarkAs/TaskInstances/MarkTaskInstancesAsButton.tsx b/airflow-core/src/airflow/ui/src/components/MarkAs/TaskInstances/MarkTaskInstancesAsButton.tsx new file mode 100644 index 0000000000000..3f10d219fd96a --- /dev/null +++ b/airflow-core/src/airflow/ui/src/components/MarkAs/TaskInstances/MarkTaskInstancesAsButton.tsx @@ -0,0 +1,106 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useDisclosure } from "@chakra-ui/react"; +import type { TaskInstanceResponse, TaskInstanceState } from "openapi-gen/requests/types.gen"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; +import { MdArrowDropDown } from "react-icons/md"; + +import { StateBadge } from "src/components/StateBadge"; +import { Menu, Tooltip } from "src/components/ui"; +import ActionButton from "src/components/ui/ActionButton"; + +import PatchTaskInstancesDialog from "./MarkTaskInstancesAsDialog"; + +type Props = { + readonly clearSelections: () => void; + readonly dagId: string; + readonly dagRunId: string; + readonly patchKeys: Array; +}; + +const MarkTaskInstancesAsButton = ({ clearSelections, dagId, dagRunId, patchKeys }: Props) => { + const { onClose, onOpen, open } = useDisclosure(); + const [selectedState, setSelectedState] = useState("success"); + + const allowedStates: Array = ["success", "failed"]; + + const { t: translate } = useTranslation(); + + if (patchKeys.length === 0) { + return undefined; + } + + const type = translate("common:taskInstance_other"); + const patchButtonText = translate("dags:runAndTaskActions.markAs.button", { type }); + + return ( + <> + + + } + text={patchButtonText} + variant="outline" + withText + /> + + + {allowedStates.map((menuState) => { + const content = translate( + `dags:runAndTaskActions.markAs.buttonTooltip.${menuState === "success" ? "success" : "failed"}`, + ); + + return ( + + { + setSelectedState(menuState); + onOpen(); + }} + value={menuState} + > + + {translate(`common:states.${menuState}`)} + + + + ); + })} + + + + + ); +}; + +export default MarkTaskInstancesAsButton; diff --git a/airflow-core/src/airflow/ui/src/components/MarkAs/TaskInstances/MarkTaskInstancesAsDialog.tsx b/airflow-core/src/airflow/ui/src/components/MarkAs/TaskInstances/MarkTaskInstancesAsDialog.tsx new file mode 100644 index 0000000000000..f1b123ce92e27 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/components/MarkAs/TaskInstances/MarkTaskInstancesAsDialog.tsx @@ -0,0 +1,109 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Button, Flex, Heading, VStack } from "@chakra-ui/react"; +import type { + TaskInstanceCollectionResponse, + TaskInstanceResponse, + TaskInstanceState, +} from "openapi-gen/requests/types.gen"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { ActionAccordion } from "src/components/ActionAccordion"; +import { StateBadge } from "src/components/StateBadge"; +import { Dialog } from "src/components/ui/Dialog"; +import { useBulkPatchTaskInstances } from "src/queries/useBulkPatchTaskInstances"; + +type Props = { + readonly clearSelections: () => void; + readonly dagId: string; + readonly dagRunId: string; + readonly onClose: () => void; + readonly open: boolean; + readonly patchKeys: Array; + readonly selectedState: TaskInstanceState; +}; + +const MarkTaskInstancesAsDialog = ({ + clearSelections, + dagId, + dagRunId, + onClose, + open, + patchKeys, + selectedState, +}: Props) => { + const [note, setNote] = useState(); + + const { isPending, patchTaskInstances } = useBulkPatchTaskInstances({ + dagId, + dagRunId, + onSuccessConfirm: () => { + clearSelections(); + onClose(); + }, + }); + const { t: translate } = useTranslation(); + + const affectedTasks = { + task_instances: patchKeys, + total_entries: patchKeys.length, + } as TaskInstanceCollectionResponse; + + const handlePatch = (state: TaskInstanceState) => { + const actionValue = state === "failed" ? "set_failed" : "set_success"; + + patchTaskInstances(patchKeys, actionValue, { note }); + onClose(); + }; + + return ( + + + + + + + {translate("dags:runAndTaskActions.markAs.title", { + state: selectedState, + type: translate("common:taskInstance_other"), + })} + : + {" "} + + + + + + + + + + + + + + + + ); +}; + +export default MarkTaskInstancesAsDialog; diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx index 2579cc83435d4..3a01757d5b300 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx @@ -36,6 +36,7 @@ import { type GetColumnsParams, useRowSelection } from "src/components/DataTable import { useTableURLState } from "src/components/DataTable/useTableUrlState"; import { ErrorAlert } from "src/components/ErrorAlert"; import { MarkTaskInstanceAsButton } from "src/components/MarkAs"; +import MarkTaskInstancesAsButton from "src/components/MarkAs/TaskInstances/MarkTaskInstancesAsButton"; import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; import { TruncatedText } from "src/components/TruncatedText"; @@ -386,9 +387,31 @@ export const TaskInstances = () => { 1 ? "taskInstance_other" : "taskInstance_one"}`, - )}`} + content={`${translate("dags:runAndTaskActions.markAs.button")} ${translate("taskInstance", { count: selectedRows.size })}`} + disabled={selectedRows.size === 0} + > + { + const [dagRunId, currentTaskId, mapIndex] = id.split(SEPARATOR); + + return data?.task_instances.find( + (ti) => + ti.dag_run_id === dagRunId && + ti.task_id === currentTaskId && + ti.map_index === Number(mapIndex ?? -1), + ); + }) + .filter(Boolean) as Array + } + /> + + { + const queryClient = useQueryClient(); + const [error, setError] = useState(undefined); + const { t: translate } = useTranslation("common"); + + const onSuccess = async (responseData: { patch?: { errors: Array; success: Array } }) => { + const queryKeys = [ + [useTaskInstanceServiceGetTaskInstancesKey], + dagId === undefined || dagRunId === undefined + ? [] + : [UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId })], + ]; + + await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key }))); + + if (responseData.patch) { + const { errors, success } = responseData.patch; + + if (Array.isArray(errors) && errors.length > 0) { + const apiError = errors[0] as { error: string }; + + setError({ + body: { detail: apiError.error }, + }); + } else if (Array.isArray(success) && success.length > 0) { + toaster.create({ + description: translate("bulkAction.patch.taskInstance.success.description", { + count: success.length, + keys: success.join(", "), + }), + title: translate("bulkAction.patch.taskInstance.success.title"), + type: "success", + }); + onSuccessConfirm(); + } + } + }; + + const onError = (_error: unknown) => { + setError(_error); + }; + + const getNewState = (action: string) => { + switch (action) { + case "set_failed": + return "failed" as const; + case "set_success": + return "success" as const; + default: + return undefined; + } + }; + + const { isPending, mutate } = useTaskInstanceServiceBulkTaskInstances({ + onError, + onSuccess, + }); + + const patchTaskInstances = ( + entities: Array, + selectedAction: string, + options: { + include_downstream?: boolean; + include_future?: boolean; + include_past?: boolean; + include_upstream?: boolean; + note?: string | null; + } = {}, + ) => { + const newState = getNewState(selectedAction); + + if (Boolean(dagId) && Boolean(dagRunId) && dagId !== undefined && dagRunId !== undefined) { + mutate({ + dagId, + dagRunId, + requestBody: { + actions: [ + { + action: "update", + entities: entities.map((ti) => ({ + include_downstream: options.include_downstream, + include_future: options.include_future, + include_past: options.include_past, + include_upstream: options.include_upstream, + map_index: ti.map_index, + new_state: newState, + note: options.note, + task_id: ti.task_id, + })), + }, + ], + }, + }); + } else { + // cross dag run + const groupedByDagRunTIs: Record> = {}; + + entities.forEach((ti) => { + (groupedByDagRunTIs[`${ti.dag_id}${SEPARATOR}${ti.dag_run_id}`] ??= []).push(ti); + }); + + Object.entries(groupedByDagRunTIs).forEach(([key, groupTIs]) => { + const [groupDagId, groupDagRunId] = key.split(SEPARATOR); + + if (groupDagId !== undefined && groupDagRunId !== undefined) { + mutate({ + dagId: groupDagId, + dagRunId: groupDagRunId, + requestBody: { + actions: [ + { + action: "update", + entities: groupTIs.map((ti) => ({ + include_downstream: options.include_downstream, + include_future: options.include_future, + include_past: options.include_past, + include_upstream: options.include_upstream, + map_index: ti.map_index, + new_state: newState, + note: options.note, + task_id: ti.task_id, + })), + }, + ], + }, + }); + } + }); + } + }; + + return { error, isPending, patchTaskInstances }; +}; From d599a72890b1ac189335b1b3e50b6ef862c1c599 Mon Sep 17 00:00:00 2001 From: "Guan Ming(Wesley) Chiu" <105915352+guan404ming@users.noreply.github.com> Date: Sun, 31 Aug 2025 18:32:41 +0800 Subject: [PATCH 5/8] Add incremental success msg --- .../src/queries/useBulkDeleteTaskInstances.ts | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts index 59b620fbe379a..b28ca72072014 100644 --- a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts +++ b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts @@ -17,7 +17,7 @@ * under the License. */ import { useQueryClient } from "@tanstack/react-query"; -import { useState } from "react"; +import { useState, useRef } from "react"; import { useTranslation } from "react-i18next"; import { @@ -40,6 +40,9 @@ export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm } const queryClient = useQueryClient(); const [error, setError] = useState(undefined); const { t: translate } = useTranslation("common"); + const pendingOperationsRef = useRef(0); + const allSuccessesRef = useRef>([]); + const hasErrorsRef = useRef(false); const onSuccess = async (responseData: { delete?: { errors: Array; success: Array } }) => { const queryKeys = [ @@ -57,25 +60,40 @@ export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm } if (Array.isArray(errors) && errors.length > 0) { const apiError = errors[0] as { error: string }; + hasErrorsRef.current = true; setError({ body: { detail: apiError.error }, }); } else if (Array.isArray(success) && success.length > 0) { + allSuccessesRef.current = [...allSuccessesRef.current, ...success]; + } + } + + pendingOperationsRef.current -= 1; + + if (pendingOperationsRef.current === 0 && !hasErrorsRef.current) { + // All operations completed successfully + if (allSuccessesRef.current.length > 0) { toaster.create({ description: translate("bulkAction.delete.taskInstance.success.description", { - count: success.length, - keys: success.join(", "), + count: allSuccessesRef.current.length, + keys: allSuccessesRef.current.join(", "), }), title: translate("bulkAction.delete.taskInstance.success.title"), type: "success", }); onSuccessConfirm(); } + // Reset state for next operation + allSuccessesRef.current = []; + hasErrorsRef.current = false; } }; const onError = (_error: unknown) => { setError(_error); + hasErrorsRef.current = true; + pendingOperationsRef.current -= 1; }; const { isPending, mutate } = useTaskInstanceServiceBulkTaskInstances({ @@ -84,7 +102,13 @@ export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm } }); const deleteTaskInstances = (entities: Array) => { + // Reset state for new operation + allSuccessesRef.current = []; + hasErrorsRef.current = false; + setError(undefined); + if (Boolean(dagId) && Boolean(dagRunId) && dagId !== undefined && dagRunId !== undefined) { + pendingOperationsRef.current = 1; mutate({ dagId, dagRunId, @@ -105,6 +129,8 @@ export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm } (groupedByDagRunTIs[`${ti.dag_id}${SEPARATOR}${ti.dag_run_id}`] ??= []).push(ti); }); + pendingOperationsRef.current = Object.keys(groupedByDagRunTIs).length; + Object.entries(groupedByDagRunTIs).forEach(([key, groupTIs]) => { const [groupDagId, groupDagRunId] = key.split(SEPARATOR); From 4071206ec1be6d83417ae299957f344625630487 Mon Sep 17 00:00:00 2001 From: "Guan Ming(Wesley) Chiu" <105915352+guan404ming@users.noreply.github.com> Date: Fri, 3 Oct 2025 00:00:26 +0800 Subject: [PATCH 6/8] Update warning to more detailed version --- airflow-core/src/airflow/ui/public/i18n/locales/en/common.json | 3 ++- .../src/airflow/ui/public/i18n/locales/zh-TW/common.json | 3 ++- .../ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index e9480299f94f4..eef5b4cd09df2 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -31,7 +31,8 @@ "success": { "description": "{{count}} task instances deleted successfully. Keys: {{keys}}", "title": "Delete Task Instances Request Successful" - } + }, + "warning": "This will remove all metadata related to the Task Instances, including task instance history and audit record." } } }, diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json index 70065ca125597..4246cbd6d2b91 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json @@ -31,7 +31,8 @@ "success": { "description": "已成功刪除 {{count}} 個任務實例。鍵:{{keys}}", "title": "已提交刪除任務實例請求" - } + }, + "warning": "這將刪除所有與任務實例相關的系統資料,包括歷史記錄和審計日誌。" } } }, diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx index 34692e27a23ad..5bf4e987eab34 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/DeleteTaskInstancesButton.tsx @@ -52,7 +52,7 @@ const DeleteTaskInstancesButton = ({ clearSelections, dagId, dagRunId, deleteKey const type = translate("common:taskInstance_other"); const title = translate("dags:runAndTaskActions.delete.dialog.title", { type }); - const warningText = translate("dags:runAndTaskActions.delete.dialog.warning", { type }); + const warningText = translate("common:bulkAction.delete.taskInstance.warning", { type }); const deleteButtonText = translate("dags:runAndTaskActions.delete.button", { type }); const affectedTasks = { From a5817429f9f3aea77c9826111452c87b2bcb3170 Mon Sep 17 00:00:00 2001 From: "Guan Ming(Wesley) Chiu" <105915352+guan404ming@users.noreply.github.com> Date: Fri, 3 Oct 2025 00:28:50 +0800 Subject: [PATCH 7/8] Remove unused eslint-disable --- .../src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx | 2 -- 1 file changed, 2 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx index 3a01757d5b300..1123bbab5134b 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx @@ -1,5 +1,3 @@ -/* eslint-disable max-lines */ - /*! * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file From 9c0611ec8dc4237c0a56f555fac208035b809bda Mon Sep 17 00:00:00 2001 From: "Guan-Ming (Wesley) Chiu" <105915352+guan404ming@users.noreply.github.com> Date: Wed, 10 Dec 2025 18:03:09 +0800 Subject: [PATCH 8/8] Update to use wildcard supported endpoint --- .../src/pages/TaskInstances/TaskInstances.tsx | 2 +- .../src/queries/useBulkDeleteTaskInstances.ts | 92 +++++-------------- .../src/queries/useBulkPatchTaskInstances.ts | 87 ++++++------------ 3 files changed, 50 insertions(+), 131 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx index 1123bbab5134b..33024755e32b8 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx @@ -21,7 +21,7 @@ import { Flex, Link } from "@chakra-ui/react"; import type { ColumnDef } from "@tanstack/react-table"; import type { TFunction } from "i18next"; -import { useMemo, useMemo } from "react"; +import { useMemo } from "react"; import { useTranslation } from "react-i18next"; import { Link as RouterLink, useParams, useSearchParams } from "react-router-dom"; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts index b28ca72072014..1f1cd9a4f82af 100644 --- a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts +++ b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteTaskInstances.ts @@ -17,7 +17,7 @@ * under the License. */ import { useQueryClient } from "@tanstack/react-query"; -import { useState, useRef } from "react"; +import { useState } from "react"; import { useTranslation } from "react-i18next"; import { @@ -34,15 +34,10 @@ type Props = { readonly onSuccessConfirm: VoidFunction; }; -const SEPARATOR = "SEPARATOR"; - export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm }: Props) => { const queryClient = useQueryClient(); const [error, setError] = useState(undefined); const { t: translate } = useTranslation("common"); - const pendingOperationsRef = useRef(0); - const allSuccessesRef = useRef>([]); - const hasErrorsRef = useRef(false); const onSuccess = async (responseData: { delete?: { errors: Array; success: Array } }) => { const queryKeys = [ @@ -60,40 +55,25 @@ export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm } if (Array.isArray(errors) && errors.length > 0) { const apiError = errors[0] as { error: string }; - hasErrorsRef.current = true; setError({ body: { detail: apiError.error }, }); } else if (Array.isArray(success) && success.length > 0) { - allSuccessesRef.current = [...allSuccessesRef.current, ...success]; - } - } - - pendingOperationsRef.current -= 1; - - if (pendingOperationsRef.current === 0 && !hasErrorsRef.current) { - // All operations completed successfully - if (allSuccessesRef.current.length > 0) { toaster.create({ description: translate("bulkAction.delete.taskInstance.success.description", { - count: allSuccessesRef.current.length, - keys: allSuccessesRef.current.join(", "), + count: success.length, + keys: success.join(", "), }), title: translate("bulkAction.delete.taskInstance.success.title"), type: "success", }); onSuccessConfirm(); } - // Reset state for next operation - allSuccessesRef.current = []; - hasErrorsRef.current = false; } }; const onError = (_error: unknown) => { setError(_error); - hasErrorsRef.current = true; - pendingOperationsRef.current -= 1; }; const { isPending, mutate } = useTaskInstanceServiceBulkTaskInstances({ @@ -102,54 +82,28 @@ export const useBulkDeleteTaskInstances = ({ dagId, dagRunId, onSuccessConfirm } }); const deleteTaskInstances = (entities: Array) => { - // Reset state for new operation - allSuccessesRef.current = []; - hasErrorsRef.current = false; setError(undefined); - if (Boolean(dagId) && Boolean(dagRunId) && dagId !== undefined && dagRunId !== undefined) { - pendingOperationsRef.current = 1; - mutate({ - dagId, - dagRunId, - requestBody: { - actions: [ - { - action: "delete", - entities: entities.map((ti) => ({ map_index: ti.map_index, task_id: ti.task_id })), - }, - ], - }, - }); - } else { - // cross dag run - const groupedByDagRunTIs: Record> = {}; - - entities.forEach((ti) => { - (groupedByDagRunTIs[`${ti.dag_id}${SEPARATOR}${ti.dag_run_id}`] ??= []).push(ti); - }); - - pendingOperationsRef.current = Object.keys(groupedByDagRunTIs).length; - - Object.entries(groupedByDagRunTIs).forEach(([key, groupTIs]) => { - const [groupDagId, groupDagRunId] = key.split(SEPARATOR); - - if (groupDagId !== undefined && groupDagRunId !== undefined) { - mutate({ - dagId: groupDagId, - dagRunId: groupDagRunId, - requestBody: { - actions: [ - { - action: "delete", - entities: groupTIs.map((ti) => ({ map_index: ti.map_index, task_id: ti.task_id })), - }, - ], - }, - }); - } - }); - } + const isSingleDagRun = + Boolean(dagId) && Boolean(dagRunId) && dagId !== undefined && dagRunId !== undefined; + + mutate({ + dagId: isSingleDagRun ? dagId : "~", + dagRunId: isSingleDagRun ? dagRunId : "~", + requestBody: { + actions: [ + { + action: "delete", + entities: entities.map((ti) => ({ + dag_id: isSingleDagRun ? undefined : ti.dag_id, + dag_run_id: isSingleDagRun ? undefined : ti.dag_run_id, + map_index: ti.map_index, + task_id: ti.task_id, + })), + }, + ], + }, + }); }; return { deleteTaskInstances, error, isPending }; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkPatchTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkPatchTaskInstances.ts index 92bfba7bd471d..25843ece60fb9 100644 --- a/airflow-core/src/airflow/ui/src/queries/useBulkPatchTaskInstances.ts +++ b/airflow-core/src/airflow/ui/src/queries/useBulkPatchTaskInstances.ts @@ -34,8 +34,6 @@ type Props = { readonly onSuccessConfirm: VoidFunction; }; -const SEPARATOR = "SEPARATOR"; - export const useBulkPatchTaskInstances = ({ dagId, dagRunId, onSuccessConfirm }: Props) => { const queryClient = useQueryClient(); const [error, setError] = useState(undefined); @@ -106,65 +104,32 @@ export const useBulkPatchTaskInstances = ({ dagId, dagRunId, onSuccessConfirm }: } = {}, ) => { const newState = getNewState(selectedAction); - - if (Boolean(dagId) && Boolean(dagRunId) && dagId !== undefined && dagRunId !== undefined) { - mutate({ - dagId, - dagRunId, - requestBody: { - actions: [ - { - action: "update", - entities: entities.map((ti) => ({ - include_downstream: options.include_downstream, - include_future: options.include_future, - include_past: options.include_past, - include_upstream: options.include_upstream, - map_index: ti.map_index, - new_state: newState, - note: options.note, - task_id: ti.task_id, - })), - }, - ], - }, - }); - } else { - // cross dag run - const groupedByDagRunTIs: Record> = {}; - - entities.forEach((ti) => { - (groupedByDagRunTIs[`${ti.dag_id}${SEPARATOR}${ti.dag_run_id}`] ??= []).push(ti); - }); - - Object.entries(groupedByDagRunTIs).forEach(([key, groupTIs]) => { - const [groupDagId, groupDagRunId] = key.split(SEPARATOR); - - if (groupDagId !== undefined && groupDagRunId !== undefined) { - mutate({ - dagId: groupDagId, - dagRunId: groupDagRunId, - requestBody: { - actions: [ - { - action: "update", - entities: groupTIs.map((ti) => ({ - include_downstream: options.include_downstream, - include_future: options.include_future, - include_past: options.include_past, - include_upstream: options.include_upstream, - map_index: ti.map_index, - new_state: newState, - note: options.note, - task_id: ti.task_id, - })), - }, - ], - }, - }); - } - }); - } + const isSingleDagRun = + Boolean(dagId) && Boolean(dagRunId) && dagId !== undefined && dagRunId !== undefined; + + mutate({ + dagId: isSingleDagRun ? dagId : "~", + dagRunId: isSingleDagRun ? dagRunId : "~", + requestBody: { + actions: [ + { + action: "update", + entities: entities.map((ti) => ({ + dag_id: isSingleDagRun ? undefined : ti.dag_id, + dag_run_id: isSingleDagRun ? undefined : ti.dag_run_id, + include_downstream: options.include_downstream, + include_future: options.include_future, + include_past: options.include_past, + include_upstream: options.include_upstream, + map_index: ti.map_index, + new_state: newState, + note: options.note, + task_id: ti.task_id, + })), + }, + ], + }, + }); }; return { error, isPending, patchTaskInstances };