From 4b470920505fd8127a29e440881b8bcc441ce04f Mon Sep 17 00:00:00 2001 From: shubhamraj-git Date: Mon, 23 Mar 2026 18:54:37 +0000 Subject: [PATCH 1/7] group affected tasks by dag run id --- .../ui/public/i18n/locales/en/common.json | 3 +- .../ActionAccordion/ActionAccordion.tsx | 83 ++++++++++++++++--- 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 4176a91506254..b5b3b0ce14b23 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -354,5 +354,6 @@ "tooltip": "Press {{hotkey}} to toggle wrap", "unwrap": "Unwrap", "wrap": "Wrap" - } + }, + "selected": "Selected" } diff --git a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx index e2f5b274a4c10..8c0373b0f1879 100644 --- a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx +++ b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx @@ -20,7 +20,7 @@ import { Box, Editable, Text, VStack } from "@chakra-ui/react"; import type { ChangeEvent } from "react"; import { useTranslation } from "react-i18next"; -import type { DAGRunResponse, TaskInstanceCollectionResponse } from "openapi/requests/types.gen"; +import type { DAGRunResponse, TaskInstanceCollectionResponse, TaskInstanceResponse } from "openapi/requests/types.gen"; import ReactMarkdown from "src/components/ReactMarkdown"; import { Accordion } from "src/components/ui"; @@ -29,17 +29,59 @@ import { getColumns } from "./columns"; type Props = { readonly affectedTasks?: TaskInstanceCollectionResponse; + readonly groupByRunId?: boolean; readonly note: DAGRunResponse["note"]; readonly setNote: (value: string) => void; }; +const TasksTable = ({ + noRowsMessage, + tasks, +}: { + noRowsMessage: string; + tasks: Array; +}) => { + const { t: translate } = useTranslation(); + const columns = getColumns(translate); + + return ( + + ); +}; + // Table is in memory, pagination and sorting are disabled. // TODO: Make a front-end only unconnected table component with client side ordering and pagination -const ActionAccordion = ({ affectedTasks, note, setNote }: Props) => { +const ActionAccordion = ({ affectedTasks, groupByRunId = false, note, setNote }: Props) => { const showTaskSection = affectedTasks !== undefined; const { t: translate } = useTranslation(); - const columns = getColumns(translate); + // Group task instances by dag_run_id when requested + const runGroups = (() => { + if (!groupByRunId || !affectedTasks) { + return null; + } + + const map = new Map>(); + + for (const ti of affectedTasks.task_instances) { + const group = map.get(ti.dag_run_id) ?? []; + + group.push(ti); + map.set(ti.dag_run_id, group); + } + + return map; + })(); + + // Only group when there are actually multiple run IDs + const shouldGroup = groupByRunId && runGroups !== null && runGroups.size > 1; return ( { - + {shouldGroup && runGroups !== null ? ( + + {[...runGroups.entries()].map(([runId, tis]) => ( + + + + {translate("runId")}: {runId}{" "} + + ({tis.length}) + + + + + + + + ))} + + ) : ( + + )} From 5099277cef9c2e1c25461a1c0d237a85377d24ed Mon Sep 17 00:00:00 2001 From: shubhamraj-git Date: Mon, 23 Mar 2026 18:56:00 +0000 Subject: [PATCH 2/7] add bulk ti query and mutation hooks --- .../ui/src/queries/useBulkClearDryRun.ts | 113 +++++++++++++++++ .../src/queries/useBulkClearTaskInstances.ts | 120 ++++++++++++++++++ .../ui/src/queries/useBulkMarkAsDryRun.ts | 107 ++++++++++++++++ .../ui/src/queries/useBulkTaskInstances.ts | 97 ++++++++++++++ 4 files changed, 437 insertions(+) create mode 100644 airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts create mode 100644 airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts create mode 100644 airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts create mode 100644 airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts b/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts new file mode 100644 index 0000000000000..e631860f300c7 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts @@ -0,0 +1,113 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueries } from "@tanstack/react-query"; +import { useMemo } from "react"; + +import { TaskInstanceService } from "openapi/requests/services.gen"; +import type { TaskInstanceCollectionResponse, TaskInstanceResponse } from "openapi/requests/types.gen"; + +type Options = { + includeDownstream: boolean; + includeFuture: boolean; + includeOnlyFailed: boolean; + includePast: boolean; + includeUpstream: boolean; +}; + +const EMPTY: TaskInstanceCollectionResponse = { task_instances: [], total_entries: 0 }; + +export const useBulkClearDryRunKey = "bulkClearDryRun"; + +export const useBulkClearDryRun = ( + enabled: boolean, + selectedTaskInstances: Array, + options: Options, +) => { + const byDagRun = useMemo(() => { + const groups = new Map }>(); + + for (const ti of selectedTaskInstances) { + const key = `${ti.dag_id}::${ti.dag_run_id}`; + const group = groups.get(key) ?? { dagId: ti.dag_id, dagRunId: ti.dag_run_id, tis: [] }; + + group.tis.push(ti); + groups.set(key, group); + } + + return [...groups.values()]; + }, [selectedTaskInstances]); + + const results = useQueries({ + queries: byDagRun.map(({ dagId, dagRunId, tis }) => ({ + enabled, + queryFn: () => + TaskInstanceService.postClearTaskInstances({ + dagId, + requestBody: { + dag_run_id: dagRunId, + dry_run: true, + include_downstream: options.includeDownstream, + include_future: options.includeFuture, + include_past: options.includePast, + include_upstream: options.includeUpstream, + only_failed: options.includeOnlyFailed, + task_ids: tis.map((ti) => + ti.map_index >= 0 ? ([ti.task_id, ti.map_index] as [string, number]) : ti.task_id, + ), + }, + }), + queryKey: [ + useBulkClearDryRunKey, + dagId, + dagRunId, + { + include_downstream: options.includeDownstream, + include_future: options.includeFuture, + include_only_failed: options.includeOnlyFailed, + include_past: options.includePast, + include_upstream: options.includeUpstream, + task_ids: tis.map((ti) => `${ti.task_id}:${ti.map_index}`), + }, + ], + refetchOnMount: "always" as const, + })), + }); + + const isFetching = results.some((result) => result.isFetching); + + const data = useMemo(() => { + const seen = new Set(); + const merged: Array = []; + + for (const result of results) { + for (const ti of result.data?.task_instances ?? []) { + const key = `${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`; + + if (!seen.has(key)) { + seen.add(key); + merged.push(ti); + } + } + } + + return merged.length === 0 ? EMPTY : { task_instances: merged, total_entries: merged.length }; + }, [results]); + + return { data, isFetching }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts new file mode 100644 index 0000000000000..79b1bcc242535 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts @@ -0,0 +1,120 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { + useDagRunServiceGetDagRunsKey, + useTaskInstanceServiceGetTaskInstancesKey, +} from "openapi/queries"; +import { TaskInstanceService } from "openapi/requests/services.gen"; +import type { TaskInstanceResponse } from "openapi/requests/types.gen"; +import { toaster } from "src/components/ui"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly onSuccessConfirm: VoidFunction; +}; + +export type BulkClearOptions = { + includeDownstream: boolean; + includeFuture: boolean; + includeOnlyFailed: boolean; + includePast: boolean; + includeUpstream: boolean; + preventRunningTask: boolean; +}; + +export const useBulkClearTaskInstances = ({ clearSelections, onSuccessConfirm }: Props) => { + const queryClient = useQueryClient(); + const [error, setError] = useState(undefined); + const [isPending, setIsPending] = useState(false); + const { t: translate } = useTranslation(["common", "dags"]); + + const invalidateQueries = async () => { + await Promise.all([ + queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }), + queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }), + ]); + }; + + const bulkClear = async (taskInstances: Array, options: BulkClearOptions) => { + setError(undefined); + setIsPending(true); + + // Group by (dag_id, dag_run_id) — clear endpoint requires a specific dag_id + // and dag_run_id scopes the clear to the specific run + const byDagRun = new Map }>(); + + for (const ti of taskInstances) { + const key = `${ti.dag_id}::${ti.dag_run_id}`; + const group = byDagRun.get(key) ?? { dagId: ti.dag_id, dagRunId: ti.dag_run_id, tis: [] }; + + group.tis.push(ti); + byDagRun.set(key, group); + } + + try { + await Promise.all( + [...byDagRun.values()].map(({ dagId, dagRunId, tis }) => + TaskInstanceService.postClearTaskInstances({ + dagId, + requestBody: { + dag_run_id: dagRunId, + dry_run: false, + include_downstream: options.includeDownstream, + include_future: options.includeFuture, + include_past: options.includePast, + include_upstream: options.includeUpstream, + only_failed: options.includeOnlyFailed, + ...(options.preventRunningTask ? { prevent_running_task: true } : {}), + task_ids: tis.map((ti) => + ti.map_index >= 0 + ? ([ti.task_id, ti.map_index] as [string, number]) + : ti.task_id, + ), + }, + }), + ), + ); + + await invalidateQueries(); + + toaster.create({ + description: translate("toaster.bulkDelete.success.description", { + count: taskInstances.length, + keys: taskInstances.map((ti) => ti.task_id).join(", "), + resourceName: translate("taskInstance_other"), + }), + title: translate("toaster.bulkDelete.success.title"), + type: "success", + }); + + clearSelections(); + onSuccessConfirm(); + } catch (_error) { + setError(_error); + } finally { + setIsPending(false); + } + }; + + return { bulkClear, error, isPending, setError }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts new file mode 100644 index 0000000000000..23ef8ce6e7e29 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts @@ -0,0 +1,107 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueries } from "@tanstack/react-query"; +import { useMemo } from "react"; + +import { TaskInstanceService } from "openapi/requests/services.gen"; +import type { + TaskInstanceCollectionResponse, + TaskInstanceResponse, + TaskInstanceState, +} from "openapi/requests/types.gen"; + +type Options = { + includeDownstream: boolean; + includeFuture: boolean; + includePast: boolean; + includeUpstream: boolean; +}; + +const EMPTY: TaskInstanceCollectionResponse = { task_instances: [], total_entries: 0 }; + +export const useBulkMarkAsDryRunKey = "bulkMarkAsDryRun"; + +export const useBulkMarkAsDryRun = ( + enabled: boolean, + selectedTaskInstances: Array, + targetState: TaskInstanceState, + options: Options, +) => { + const affectedInstances = useMemo( + () => selectedTaskInstances.filter((ti) => ti.state !== targetState), + [selectedTaskInstances, targetState], + ); + + const results = useQueries({ + queries: affectedInstances.map((ti) => ({ + enabled, + queryFn: () => + TaskInstanceService.patchTaskInstanceDryRun({ + dagId: ti.dag_id, + dagRunId: ti.dag_run_id, + mapIndex: ti.map_index, + requestBody: { + include_downstream: options.includeDownstream, + include_future: options.includeFuture, + include_past: options.includePast, + include_upstream: options.includeUpstream, + new_state: targetState, + }, + taskId: ti.task_id, + }), + queryKey: [ + useBulkMarkAsDryRunKey, + ti.dag_id, + ti.dag_run_id, + ti.task_id, + ti.map_index, + { + include_downstream: options.includeDownstream, + include_future: options.includeFuture, + include_past: options.includePast, + include_upstream: options.includeUpstream, + new_state: targetState, + }, + ], + refetchOnMount: "always" as const, + })), + }); + + const isFetching = results.some((result) => result.isFetching); + + const data = useMemo(() => { + const seen = new Set(); + const merged: Array = []; + + for (const result of results) { + for (const ti of result.data?.task_instances ?? []) { + const key = `${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`; + + if (!seen.has(key)) { + seen.add(key); + merged.push(ti); + } + } + } + + return merged.length === 0 ? EMPTY : { task_instances: merged, total_entries: merged.length }; + }, [results]); + + return { data, isFetching }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts new file mode 100644 index 0000000000000..ebd236674cd20 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts @@ -0,0 +1,97 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { + useDagRunServiceGetDagRunsKey, + useTaskInstanceServiceBulkTaskInstances, + useTaskInstanceServiceGetTaskInstancesKey, +} from "openapi/queries"; +import type { BulkActionResponse, BulkBody_BulkTaskInstanceBody_, BulkResponse } from "openapi/requests/types.gen"; +import { toaster } from "src/components/ui"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly onSuccessConfirm: VoidFunction; +}; + +const handleActionResult = ( + actionResult: BulkActionResponse, + setError: (error: unknown) => void, + onSuccess: (count: number, keys: Array) => void, +) => { + const { errors, success } = actionResult; + + if (Array.isArray(errors) && errors.length > 0) { + const apiError = errors[0] as { error: string }; + + setError({ body: { detail: apiError.error } }); + } else if (Array.isArray(success) && success.length > 0) { + onSuccess(success.length, success); + } +}; + +export const useBulkTaskInstances = ({ clearSelections, onSuccessConfirm }: Props) => { + const queryClient = useQueryClient(); + const [error, setError] = useState(undefined); + const { t: translate } = useTranslation(["common", "dags"]); + + const onSuccess = async (responseData: BulkResponse) => { + await Promise.all([ + queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }), + queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }), + ]); + + const actionResult = responseData.delete ?? responseData.update; + + if (actionResult) { + handleActionResult(actionResult, setError, (count, keys) => { + toaster.create({ + description: translate("toaster.bulkDelete.success.description", { + count, + keys: keys.join(", "), + resourceName: translate("taskInstance_other"), + }), + title: translate("toaster.bulkDelete.success.title"), + type: "success", + }); + clearSelections(); + onSuccessConfirm(); + }); + } + }; + + const onError = (_error: unknown) => { + setError(_error); + }; + + const { isPending, mutate } = useTaskInstanceServiceBulkTaskInstances({ + onError, + onSuccess, + }); + + const bulkAction = (requestBody: BulkBody_BulkTaskInstanceBody_) => { + setError(undefined); + mutate({ dagId: "~", dagRunId: "~", requestBody }); + }; + + return { bulkAction, error, isPending, setError }; +}; From 9b27aa0fc33ff5b9c302e4491fc58faad5335fd7 Mon Sep 17 00:00:00 2001 From: shubhamraj-git Date: Mon, 23 Mar 2026 18:57:36 +0000 Subject: [PATCH 3/7] add bulk action buttons for ti --- .../BulkClearTaskInstancesButton.tsx | 157 +++++++++++++ .../BulkDeleteTaskInstancesButton.tsx | 154 +++++++++++++ .../BulkMarkTaskInstancesAsButton.tsx | 206 ++++++++++++++++++ 3 files changed, 517 insertions(+) create mode 100644 airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx create mode 100644 airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx create mode 100644 airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx new file mode 100644 index 0000000000000..312b3c59b9d68 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx @@ -0,0 +1,157 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Button, Flex, Heading, VStack, useDisclosure } from "@chakra-ui/react"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; +import { CgRedo } from "react-icons/cg"; + +import type { TaskInstanceResponse } from "openapi/requests/types.gen"; +import { ActionAccordion } from "src/components/ActionAccordion"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { Checkbox, Dialog } from "src/components/ui"; +import SegmentedControl from "src/components/ui/SegmentedControl"; +import { useBulkClearTaskInstances } from "src/queries/useBulkClearTaskInstances"; + +import { useBulkClearDryRun } from "src/queries/useBulkClearDryRun"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedTaskInstances: Array; +}; + +const BulkClearTaskInstancesButton = ({ clearSelections, selectedTaskInstances }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const [selectedOptions, setSelectedOptions] = useState>(["downstream"]); + const [note, setNote] = useState(null); + const [preventRunningTask, setPreventRunningTask] = useState(true); + const { bulkClear, error, isPending } = useBulkClearTaskInstances({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const handleClose = () => { + setNote(null); + onClose(); + }; + + const past = selectedOptions.includes("past"); + const future = selectedOptions.includes("future"); + const upstream = selectedOptions.includes("upstream"); + const downstream = selectedOptions.includes("downstream"); + const onlyFailed = selectedOptions.includes("onlyFailed"); + + const hasLogicalDate = selectedTaskInstances.some((ti) => ti.logical_date !== null); + + const { data: affectedTasks, isFetching } = useBulkClearDryRun(open, selectedTaskInstances, { + includeDownstream: downstream, + includeFuture: future, + includeOnlyFailed: onlyFailed, + includePast: past, + includeUpstream: upstream, + }); + + return ( + <> + + + + + + + + {translate("dags:runAndTaskActions.clear.title", { + type: translate("taskInstance_other"), + })} + + + + + + + + + + + + + setPreventRunningTask(Boolean(event.checked))} + > + {translate("dags:runAndTaskActions.options.preventRunningTasks")} + + + + + + + + ); +}; + +export default BulkClearTaskInstancesButton; diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx new file mode 100644 index 0000000000000..6823d43e9d50b --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx @@ -0,0 +1,154 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Button, Flex, Heading, Text, useDisclosure, VStack } from "@chakra-ui/react"; +import { useTranslation } from "react-i18next"; +import { FiTrash2 } from "react-icons/fi"; + +import type { TaskInstanceResponse } from "openapi/requests/types.gen"; +import { DataTable } from "src/components/DataTable"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { Accordion, Dialog } from "src/components/ui"; +import { useBulkTaskInstances } from "src/queries/useBulkTaskInstances"; + +import { getColumns } from "src/components/ActionAccordion/columns"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedTaskInstances: Array; +}; + +const BulkDeleteTaskInstancesButton = ({ clearSelections, selectedTaskInstances }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const { bulkAction, error, isPending } = useBulkTaskInstances({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const columns = getColumns(translate); + + // Group by dag_run_id for display + const byRunId = new Map>(); + + for (const ti of selectedTaskInstances) { + const group = byRunId.get(ti.dag_run_id) ?? []; + + group.push(ti); + byRunId.set(ti.dag_run_id, group); + } + + const isGrouped = byRunId.size > 1; + + return ( + <> + + + + + + + + {translate("dags:runAndTaskActions.delete.dialog.title", { + type: translate("taskInstance_other"), + })} + + + + + + + + {translate("dags:runAndTaskActions.delete.dialog.warning", { + type: translate("taskInstance_other"), + })} + + + + {isGrouped ? ( + + {[...byRunId.entries()].map(([runId, tis]) => ( + + + + {translate("runId")}: {runId}{" "} + + ({tis.length}) + + + + + + + + ))} + + ) : ( + + )} + + + + + + + + + + + ); +}; + +export default BulkDeleteTaskInstancesButton; diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx new file mode 100644 index 0000000000000..a981177f41f75 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx @@ -0,0 +1,206 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Badge, Box, Button, Flex, Heading, HStack, VStack, useDisclosure } from "@chakra-ui/react"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; +import { FiX } from "react-icons/fi"; +import { LuCheck } from "react-icons/lu"; + +import type { TaskInstanceResponse, TaskInstanceState } from "openapi/requests/types.gen"; +import { ActionAccordion } from "src/components/ActionAccordion"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { allowedStates } from "src/components/MarkAs/utils"; +import { StateBadge } from "src/components/StateBadge"; +import { Dialog, Menu } from "src/components/ui"; +import SegmentedControl from "src/components/ui/SegmentedControl"; +import { useBulkTaskInstances } from "src/queries/useBulkTaskInstances"; + +import { useBulkMarkAsDryRun } from "src/queries/useBulkMarkAsDryRun"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedTaskInstances: Array; +}; + +const BulkMarkTaskInstancesAsButton = ({ clearSelections, selectedTaskInstances }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const [state, setState] = useState("success"); + const [selectedOptions, setSelectedOptions] = useState>([]); + const [note, setNote] = useState(null); + const { bulkAction, error, isPending, setError } = useBulkTaskInstances({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const past = selectedOptions.includes("past"); + const future = selectedOptions.includes("future"); + const upstream = selectedOptions.includes("upstream"); + const downstream = selectedOptions.includes("downstream"); + + const hasLogicalDate = selectedTaskInstances.some((ti) => ti.logical_date !== null); + + const affectedCount = (targetState: TaskInstanceState) => + selectedTaskInstances.filter((ti) => ti.state !== targetState).length; + + const { data: affectedTasks, isFetching } = useBulkMarkAsDryRun(open, selectedTaskInstances, state, { + includeDownstream: downstream, + includeFuture: future, + includePast: past, + includeUpstream: upstream, + }); + + const handleOpen = (newState: TaskInstanceState) => { + setState(newState); + setSelectedOptions([]); + setNote(null); + setError(undefined); + onOpen(); + }; + + const directlyAffected = selectedTaskInstances.filter((ti) => ti.state !== state); + + return ( + + + +
+ +
+
+ + {allowedStates.map((menuState) => { + const count = affectedCount(menuState); + + return ( + { + if (count > 0) { + handleOpen(menuState); + } + }} + value={menuState} + > + + + {translate(`common:states.${menuState}`)} + + + {count} + + + + ); + })} + +
+ + + + + + + {translate("dags:runAndTaskActions.markAs.title", { + state, + type: translate("taskInstance_other"), + })}{" "} + + + + + + + + + + + + + + + + + + +
+ ); +}; + +export default BulkMarkTaskInstancesAsButton; From 354bbd221ea880247cdc7aa09bdd158a3b499a14 Mon Sep 17 00:00:00 2001 From: shubhamraj-git Date: Mon, 23 Mar 2026 18:58:54 +0000 Subject: [PATCH 4/7] add row selection and bulk action bar --- .../src/pages/TaskInstances/TaskInstances.tsx | 360 +++++++++++------- 1 file changed, 218 insertions(+), 142 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx index c63cffd54c193..7e5374517bcfa 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx @@ -29,21 +29,30 @@ import type { TaskInstanceResponse } from "openapi/requests/types.gen"; import { ClearTaskInstanceButton } from "src/components/Clear"; import { DagVersion } from "src/components/DagVersion"; import { DataTable } from "src/components/DataTable"; +import { useRowSelection, type GetColumnsParams } from "src/components/DataTable/useRowSelection"; import { useTableURLState } from "src/components/DataTable/useTableUrlState"; import { ErrorAlert } from "src/components/ErrorAlert"; import { MarkTaskInstanceAsButton } from "src/components/MarkAs"; import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; import { TruncatedText } from "src/components/TruncatedText"; +import { ActionBar } from "src/components/ui/ActionBar"; +import { Checkbox } from "src/components/ui/Checkbox"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { useAutoRefresh, isStatePending, renderDuration } from "src/utils"; import { getTaskInstanceLink } from "src/utils/links"; +import BulkClearTaskInstancesButton from "./BulkClearTaskInstancesButton"; +import BulkDeleteTaskInstancesButton from "./BulkDeleteTaskInstancesButton"; +import BulkMarkTaskInstancesAsButton from "./BulkMarkTaskInstancesAsButton"; import DeleteTaskInstanceButton from "./DeleteTaskInstanceButton"; import { TaskInstancesFilter } from "./TaskInstancesFilter"; type TaskInstanceRow = { row: { original: TaskInstanceResponse } }; +const getRowKey = (ti: TaskInstanceResponse) => + `${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`; + const { DAG_ID_PATTERN: DAG_ID_PATTERN_PARAM, DAG_VERSION: DAG_VERSION_PARAM, @@ -63,161 +72,192 @@ const { TRY_NUMBER: TRY_NUMBER_PARAM, }: SearchParamsKeysType = SearchParamsKeys; +type ColumnProps = { + readonly dagId?: string; + readonly runId?: string; + readonly taskId?: string; + readonly translate: TFunction; +}; + const taskInstanceColumns = ({ + allRowsSelected, dagId, + onRowSelect, + onSelectAll, runId, + selectedRows, taskId, translate, -}: { - dagId?: string; - runId?: string; - taskId?: string; - translate: TFunction; -}): Array> => [ - ...(Boolean(dagId) - ? [] - : [ - { - accessorKey: "dag_display_name", - cell: ({ row: { original } }: TaskInstanceRow) => ( - - - - - - ), - enableSorting: false, - header: translate("dagId"), - }, - ]), - ...(Boolean(runId) - ? [] - : [ - { - accessorKey: "run_after", - // If we don't show the taskId column, make the dag run a link to the task instance - cell: ({ row: { original } }: TaskInstanceRow) => - Boolean(taskId) ? ( +}: ColumnProps & GetColumnsParams): Array> => { + return [ + { + accessorKey: "select", + cell: ({ row }) => ( + onRowSelect(getRowKey(row.original), Boolean(event.checked))} + /> + ), + enableHiding: false, + enableSorting: false, + header: () => ( + onSelectAll(Boolean(event.checked))} + /> + ), + meta: { + skeletonWidth: 10, + }, + }, + ...(Boolean(dagId) + ? [] + : [ + { + accessorKey: "dag_display_name", + cell: ({ row: { original } }: TaskInstanceRow) => ( + + + + + + ), + enableSorting: false, + header: translate("dagId"), + }, + ]), + ...(Boolean(runId) + ? [] + : [ + { + accessorKey: "run_after", + cell: ({ row: { original } }: TaskInstanceRow) => + Boolean(taskId) ? ( + + + + + ) : ( +