From 706d7db1664fcfd7e7955c771652839e89464283 Mon Sep 17 00:00:00 2001 From: Yuseok Jo Date: Thu, 7 May 2026 23:51:33 +0900 Subject: [PATCH 01/11] UI: Add multi-select bulk actions for Dag Runs --- .../ui/src/pages/BulkClearDagRunsButton.tsx | 147 +++++++++++ .../ui/src/pages/BulkDeleteDagRunsButton.tsx | 101 +++++++ .../ui/src/pages/BulkMarkDagRunsAsButton.tsx | 139 ++++++++++ .../src/airflow/ui/src/pages/DagRuns.test.tsx | 247 +++++++++++++++++- .../src/airflow/ui/src/pages/DagRuns.tsx | 82 +++++- .../airflow/ui/src/queries/useBulkDagRuns.ts | 200 ++++++++++++++ 6 files changed, 911 insertions(+), 5 deletions(-) create mode 100644 airflow-core/src/airflow/ui/src/pages/BulkClearDagRunsButton.tsx create mode 100644 airflow-core/src/airflow/ui/src/pages/BulkDeleteDagRunsButton.tsx create mode 100644 airflow-core/src/airflow/ui/src/pages/BulkMarkDagRunsAsButton.tsx create mode 100644 airflow-core/src/airflow/ui/src/queries/useBulkDagRuns.ts diff --git a/airflow-core/src/airflow/ui/src/pages/BulkClearDagRunsButton.tsx b/airflow-core/src/airflow/ui/src/pages/BulkClearDagRunsButton.tsx new file mode 100644 index 0000000000000..2b53c06e5fa8f --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/BulkClearDagRunsButton.tsx @@ -0,0 +1,147 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Button, Flex, Heading, Text, useDisclosure, VStack } from "@chakra-ui/react"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; +import { CgRedo } from "react-icons/cg"; + +import type { DAGRunResponse } from "openapi/requests/types.gen"; +import { ActionAccordion } from "src/components/ActionAccordion"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { Checkbox, Dialog } from "src/components/ui"; +import SegmentedControl from "src/components/ui/SegmentedControl"; +import { useBulkDagRuns } from "src/queries/useBulkDagRuns"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedDagRuns: Array; +}; + +const BulkClearDagRunsButton = ({ clearSelections, selectedDagRuns }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const [selectedOptions, setSelectedOptions] = useState>(["existingTasks"]); + const [note, setNote] = useState(null); + const [runOnLatestVersion, setRunOnLatestVersion] = useState(false); + const { bulkClear, error, isPending } = useBulkDagRuns({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const onlyFailed = selectedOptions.includes("onlyFailed"); + const onlyNew = selectedOptions.includes("newTasks"); + + const handleClose = () => { + setNote(null); + onClose(); + }; + + return ( + <> + + + + + + + + {translate("dags:runAndTaskActions.clear.title", { + type: translate("dagRun_other"), + })} + + + + + + + + + + + + {selectedDagRuns.length} {translate("dagRun_other")} + + + + {selectedDagRuns.map((dagRun) => ( + + + {dagRun.dag_id} + {" "} + / {dagRun.dag_run_id} + + ))} + + + + + + + setRunOnLatestVersion(Boolean(event.checked))} + > + {translate("dags:runAndTaskActions.options.runOnLatestVersion")} + + + + + + + + ); +}; + +export default BulkClearDagRunsButton; diff --git a/airflow-core/src/airflow/ui/src/pages/BulkDeleteDagRunsButton.tsx b/airflow-core/src/airflow/ui/src/pages/BulkDeleteDagRunsButton.tsx new file mode 100644 index 0000000000000..360fb9f0b6e5b --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/BulkDeleteDagRunsButton.tsx @@ -0,0 +1,101 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Button, Flex, Heading, Text, useDisclosure, VStack } from "@chakra-ui/react"; +import { useTranslation } from "react-i18next"; +import { FiTrash2 } from "react-icons/fi"; + +import type { DAGRunResponse } from "openapi/requests/types.gen"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { Dialog } from "src/components/ui"; +import { useBulkDagRuns } from "src/queries/useBulkDagRuns"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedDagRuns: Array; +}; + +const BulkDeleteDagRunsButton = ({ clearSelections, selectedDagRuns }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const { bulkDelete, error, isPending } = useBulkDagRuns({ + clearSelections, + onSuccessConfirm: onClose, + }); + + return ( + <> + + + + + + + + {translate("dags:runAndTaskActions.delete.dialog.title", { + type: translate("dagRun_other"), + })} + + + + + + + + {translate("dags:runAndTaskActions.delete.dialog.warning", { + type: translate("dagRun_other"), + })} + + + + + {selectedDagRuns.map((dagRun) => ( + + + {dagRun.dag_id} + {" "} + / {dagRun.dag_run_id} + + ))} + + + + + + + + + + + + ); +}; + +export default BulkDeleteDagRunsButton; diff --git a/airflow-core/src/airflow/ui/src/pages/BulkMarkDagRunsAsButton.tsx b/airflow-core/src/airflow/ui/src/pages/BulkMarkDagRunsAsButton.tsx new file mode 100644 index 0000000000000..8dae011aa6f80 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/BulkMarkDagRunsAsButton.tsx @@ -0,0 +1,139 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Badge, Box, Button, Flex, Heading, HStack, useDisclosure, VStack } from "@chakra-ui/react"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; +import { FiX } from "react-icons/fi"; +import { LuCheck } from "react-icons/lu"; + +import type { DAGRunPatchStates, DAGRunResponse } from "openapi/requests/types.gen"; +import { ActionAccordion } from "src/components/ActionAccordion"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { allowedStates } from "src/components/MarkAs/utils"; +import { StateBadge } from "src/components/StateBadge"; +import { Dialog, Menu } from "src/components/ui"; +import { useBulkDagRuns } from "src/queries/useBulkDagRuns"; + +type Props = { + readonly clearSelections: VoidFunction; + readonly selectedDagRuns: Array; +}; + +const BulkMarkDagRunsAsButton = ({ clearSelections, selectedDagRuns }: Props) => { + const { t: translate } = useTranslation(); + const { onClose, onOpen, open } = useDisclosure(); + const [state, setState] = useState("success"); + const [note, setNote] = useState(null); + const { bulkMarkAs, error, isPending, setError } = useBulkDagRuns({ + clearSelections, + onSuccessConfirm: onClose, + }); + + const affectedCount = (targetState: DAGRunPatchStates) => + selectedDagRuns.filter((dr) => dr.state !== targetState).length; + + const handleOpen = (newState: DAGRunPatchStates) => { + setState(newState); + setNote(null); + setError(undefined); + onOpen(); + }; + + const directlyAffected = selectedDagRuns.filter((dr) => dr.state !== state); + + return ( + + + +
+ +
+
+ + {allowedStates.map((menuState) => { + const count = affectedCount(menuState); + + return ( + { + if (count > 0) { + handleOpen(menuState); + } + }} + value={menuState} + > + + {translate(`common:states.${menuState}`)} + + {count} + + + + ); + })} + +
+ + + + + + + {translate("dags:runAndTaskActions.markAs.title", { + state, + type: translate("dagRun_other"), + })}{" "} + + + + + + + + + + + + + + + +
+ ); +}; + +export default BulkMarkDagRunsAsButton; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx index 50d1d9d8e6a61..3d1a2e853cb91 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx @@ -17,11 +17,24 @@ * under the License. */ import "@testing-library/jest-dom"; -import { render, screen, waitFor } from "@testing-library/react"; -import { describe, expect, it } from "vitest"; +import { fireEvent, render, screen, waitFor, within } from "@testing-library/react"; +import { http, HttpResponse } from "msw"; +import { setupServer, type SetupServerApi } from "msw/node"; +import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest"; +import { handlers } from "src/mocks/handlers"; import { AppWrapper } from "src/utils/AppWrapper"; +let server: SetupServerApi; + +beforeAll(() => { + server = setupServer(...handlers); + server.listen({ onUnhandledRequest: "bypass" }); +}); + +afterEach(() => server.resetHandlers()); +afterAll(() => server.close()); + // The dag_runs mock handler (see src/mocks/handlers/dag_runs.ts) returns: // - run_before_filter (logical_date: 2024-12-31) — excluded when filtering Jan 2025 // - run_in_range (logical_date: 2025-01-15) — included when filtering Jan 2025 @@ -46,3 +59,233 @@ describe("DagRuns logical date filter", () => { expect(screen.queryByText("run_before_filter")).not.toBeInTheDocument(); }); }); + +describe("DagRuns row selection", () => { + it("renders a select checkbox per row and reveals the action bar on selection", async () => { + render(); + + await waitFor(() => expect(screen.getByText("run_in_range")).toBeInTheDocument()); + + // Each data row carries a leading checkbox; clicking it should expose the + // bulk-action ActionBar (signalled by the "1 Selected" counter). + const runRow = screen.getByText("run_in_range").closest("tr"); + + expect(runRow).not.toBeNull(); + const [rowCheckbox] = within(runRow as HTMLElement).getAllByRole("checkbox"); + + expect(rowCheckbox).toBeDefined(); + fireEvent.click(rowCheckbox as HTMLElement); + + await waitFor(() => expect(screen.getByText(/1\s+Selected/iu)).toBeInTheDocument()); + }); +}); + +const selectRow = async (runText: string) => { + await waitFor(() => expect(screen.getByText(runText)).toBeInTheDocument()); + const row = screen.getByText(runText).closest("tr"); + + if (row === null) { + throw new Error(`Row for ${runText} not found`); + } + const [checkbox] = within(row).getAllByRole("checkbox"); + + if (checkbox === undefined) { + throw new Error(`Checkbox in row for ${runText} not found`); + } + fireEvent.click(checkbox); + + return row; +}; + +// Per-row buttons are Chakra IconButtons whose accessible name comes from +// `aria-label` and whose textContent only contains icon glyphs / single-char +// separators. The bulk-action buttons in the ActionBar render the translated +// label as a visible text node. We pick the button whose textContent itself +// matches the label regex, which is independent of locale state (the regex +// matches the i18n key under tests, the translated string in production). +const findBulkActionButton = (label: RegExp) => + screen + .getAllByRole("button", { name: label }) + .find((btn) => label.test(btn.textContent)); + +describe("DagRuns bulk delete", () => { + it("fires one DELETE per selected run and closes the dialog on success", async () => { + const onDelete = vi.fn(); + + server.use( + http.delete("/api/v2/dags/:dagId/dagRuns/:dagRunId", ({ params }) => { + onDelete(params); + + return new HttpResponse(null, { status: 204 }); + }), + ); + + render(); + + await selectRow("run_in_range"); + await selectRow("run_before_filter"); + + await waitFor(() => expect(screen.getByText(/2\s+Selected/iu)).toBeInTheDocument()); + + const bulkDeleteBtn = findBulkActionButton(/delete/iu); + + expect(bulkDeleteBtn).toBeDefined(); + fireEvent.click(bulkDeleteBtn as HTMLElement); + + // Chakra's ActionBar.Root itself has role="dialog", so wait for the + // confirm dialog (the second one) to mount and pick it explicitly. + await waitFor(() => expect(screen.getAllByRole("dialog")).toHaveLength(2)); + const dialogs = screen.getAllByRole("dialog"); + const confirmDialog = dialogs[dialogs.length - 1] as HTMLElement; + + // The dialog renders each selected run as ` / `; match on + // the run-id substring (regex matchers do partial matching on normalized + // text content, regardless of element splits). + expect(within(confirmDialog).getByText(/run_in_range/u)).toBeInTheDocument(); + expect(within(confirmDialog).getByText(/run_before_filter/u)).toBeInTheDocument(); + + fireEvent.click(within(confirmDialog).getByRole("button", { name: /confirm/iu })); + + await waitFor(() => expect(onDelete).toHaveBeenCalledTimes(2)); + expect(onDelete).toHaveBeenCalledWith( + expect.objectContaining({ dagId: "test_dag", dagRunId: "run_in_range" }), + ); + expect(onDelete).toHaveBeenCalledWith( + expect.objectContaining({ dagId: "test_dag", dagRunId: "run_before_filter" }), + ); + + // Dialog should auto-close once both deletions resolve successfully. + await waitFor(() => expect(screen.queryByRole("dialog")).not.toBeInTheDocument()); + }); + + it("keeps the dialog open and surfaces an error on partial failure", async () => { + server.use( + http.delete("/api/v2/dags/:dagId/dagRuns/:dagRunId", ({ params }) => + params.dagRunId === "run_in_range" + ? new HttpResponse(null, { status: 204 }) + : HttpResponse.json({ detail: "boom" }, { status: 500 }), + ), + ); + + render(); + + await selectRow("run_in_range"); + await selectRow("run_before_filter"); + + await waitFor(() => expect(screen.getByText(/2\s+Selected/iu)).toBeInTheDocument()); + const bulkDeleteBtn = findBulkActionButton(/delete/iu); + + expect(bulkDeleteBtn).toBeDefined(); + fireEvent.click(bulkDeleteBtn as HTMLElement); + + await waitFor(() => expect(screen.getAllByRole("dialog")).toHaveLength(2)); + const dialogs = screen.getAllByRole("dialog"); + const confirmDialog = dialogs[dialogs.length - 1] as HTMLElement; + + fireEvent.click(within(confirmDialog).getByRole("button", { name: /confirm/iu })); + + // Confirm dialog stays open and shows the rejection — no auto-close + // because one request failed; the partial success is still reported via + // the toaster (not asserted here, since the Toaster portal lives outside + // AppWrapper). + await waitFor(() => expect(within(confirmDialog).getByText(/boom/iu)).toBeInTheDocument()); + expect(confirmDialog).toBeInTheDocument(); + }); +}); + +describe("DagRuns bulk clear", () => { + it("fires one /clear per selected run with the dialog options", async () => { + const onClear = vi.fn<(params: Record, body: unknown) => void>(); + + server.use( + http.post("/api/v2/dags/:dagId/dagRuns/:dagRunId/clear", async ({ params, request }) => { + const body: unknown = await request.json(); + + onClear(params as Record, body); + + return HttpResponse.json({ dag_id: params.dagId, dag_run_id: params.dagRunId }); + }), + ); + + render(); + + await selectRow("run_in_range"); + await selectRow("run_before_filter"); + await waitFor(() => expect(screen.getByText(/2\s+Selected/iu)).toBeInTheDocument()); + + const bulkClearBtn = findBulkActionButton(/clear/iu); + + expect(bulkClearBtn).toBeDefined(); + fireEvent.click(bulkClearBtn as HTMLElement); + + await waitFor(() => expect(screen.getAllByRole("dialog")).toHaveLength(2)); + const dialogs = screen.getAllByRole("dialog"); + const confirmDialog = dialogs[dialogs.length - 1] as HTMLElement; + + fireEvent.click(within(confirmDialog).getByRole("button", { name: /confirm/iu })); + + await waitFor(() => expect(onClear).toHaveBeenCalledTimes(2)); + expect(onClear).toHaveBeenCalledWith( + expect.objectContaining({ dagId: "test_dag", dagRunId: "run_in_range" }), + expect.objectContaining({ dry_run: false, only_failed: false, only_new: false }), + ); + expect(onClear).toHaveBeenCalledWith( + expect.objectContaining({ dagId: "test_dag", dagRunId: "run_before_filter" }), + expect.objectContaining({ dry_run: false, only_failed: false, only_new: false }), + ); + }); +}); + +describe("DagRuns bulk mark-as", () => { + it("fires one PATCH with the chosen state per affected run", async () => { + const onPatch = vi.fn<(params: Record, body: unknown) => void>(); + + server.use( + http.patch("/api/v2/dags/:dagId/dagRuns/:dagRunId", async ({ params, request }) => { + const body: unknown = await request.json(); + + onPatch(params as Record, body); + + return HttpResponse.json({ dag_id: params.dagId, dag_run_id: params.dagRunId }); + }), + ); + + render(); + + await selectRow("run_in_range"); + await selectRow("run_before_filter"); + await waitFor(() => expect(screen.getByText(/2\s+Selected/iu)).toBeInTheDocument()); + + // Open the Mark As menu — both seeded runs are in "success" state, so + // only the "failed" entry has a non-zero affected count. + const bulkMarkBtn = findBulkActionButton(/mark/iu); + + expect(bulkMarkBtn).toBeDefined(); + fireEvent.click(bulkMarkBtn as HTMLElement); + + // In the bulk Mark-As menu, items are plain Menu.Item nodes (no + // `data-testid`); the per-row MarkRunAsButton menus render their own + // menuitems with `data-testid="mark-run-as-*"`. Pick by absence of that + // testid so we always target the bulk menu regardless of which menu is + // open. + await waitFor(() => expect(screen.getAllByRole("menuitem").length).toBeGreaterThan(0)); + const failedItem = screen + .getAllByRole("menuitem") + .find((mi) => mi.dataset.value === "failed" && mi.dataset.testid === undefined); + + expect(failedItem).toBeDefined(); + fireEvent.click(failedItem as HTMLElement); + + await waitFor(() => expect(screen.getAllByRole("dialog")).toHaveLength(2)); + const dialogs = screen.getAllByRole("dialog"); + const confirmDialog = dialogs[dialogs.length - 1] as HTMLElement; + + fireEvent.click(within(confirmDialog).getByRole("button", { name: /confirm/iu })); + + await waitFor(() => expect(onPatch).toHaveBeenCalledTimes(2)); + expect(onPatch).toHaveBeenCalledWith( + expect.objectContaining({ dagId: "test_dag", dagRunId: "run_in_range" }), + expect.objectContaining({ state: "failed" }), + ); + }); +}); diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx index 11cb870f77c71..9575fd560e0f6 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx @@ -27,6 +27,7 @@ import type { DAGRunResponse } from "openapi/requests/types.gen"; import { ClearRunButton } from "src/components/Clear"; import { DagVersion } from "src/components/DagVersion"; import { DataTable } from "src/components/DataTable"; +import { useRowSelection, type GetColumnsParams } from "src/components/DataTable/useRowSelection"; import { useTableURLState } from "src/components/DataTable/useTableUrlState"; import { ErrorAlert } from "src/components/ErrorAlert"; import { LimitedItemsList } from "src/components/LimitedItemsList"; @@ -36,13 +37,21 @@ import { RunTypeIcon } from "src/components/RunTypeIcon"; import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; import { TruncatedText } from "src/components/TruncatedText"; +import { ActionBar } from "src/components/ui/ActionBar"; +import { Checkbox } from "src/components/ui/Checkbox"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { useAdvancedSearchArg } from "src/hooks/useAdvancedSearch"; +import BulkClearDagRunsButton from "src/pages/BulkClearDagRunsButton"; +import BulkDeleteDagRunsButton from "src/pages/BulkDeleteDagRunsButton"; +import BulkMarkDagRunsAsButton from "src/pages/BulkMarkDagRunsAsButton"; import { DagRunsFilters } from "src/pages/DagRunsFilters"; import DeleteRunButton from "src/pages/DeleteRunButton"; import { renderDuration, useAutoRefresh, isStatePending } from "src/utils"; type DagRunRow = { row: { original: DAGRunResponse } }; + +const getRowKey = (dagRun: DAGRunResponse) => `${dagRun.dag_id}:${dagRun.dag_run_id}`; + const { BUNDLE_VERSION: BUNDLE_VERSION_PARAM, CONF_CONTAINS: CONF_CONTAINS_PARAM, @@ -66,7 +75,43 @@ const { TRIGGERING_USER_NAME_PATTERN: TRIGGERING_USER_NAME_PATTERN_PARAM, }: SearchParamsKeysType = SearchParamsKeys; -const runColumns = (translate: TFunction, dagId?: string): Array> => [ +type ColumnsParams = { + readonly dagId?: string; + readonly translate: TFunction; +}; + +const runColumns = ({ + allRowsSelected, + dagId, + onRowSelect, + onSelectAll, + selectedRows, + translate, +}: ColumnsParams & GetColumnsParams): Array> => [ + { + accessorKey: "select", + cell: ({ row }) => ( + onRowSelect(getRowKey(row.original), Boolean(event.checked))} + /> + ), + enableHiding: false, + enableSorting: false, + header: () => ( + onSelectAll(Boolean(event.checked))} + /> + ), + meta: { + skeletonWidth: 10, + }, + }, ...(Boolean(dagId) ? [] : [ @@ -292,11 +337,30 @@ export const DagRuns = () => { }, ); - const columns = runColumns(translate, dagId); - const nextCursor = data?.next_cursor ?? undefined; const previousCursor = data?.previous_cursor ?? undefined; + const { allRowsSelected, clearSelections, handleRowSelect, handleSelectAll, selectedRows } = + useRowSelection({ + data: data?.dag_runs, + getKey: getRowKey, + }); + + const selectedDagRuns = (data?.dag_runs ?? []).filter((dr) => selectedRows.has(getRowKey(dr))); + + const columns = runColumns({ + allRowsSelected, + dagId, + // GetColumnsParams requires `multiTeam`, but the Dag Runs columns do not + // branch on it (unlike `Variables` / `Connections`); kept false to satisfy + // the shared selection-column type. + multiTeam: false, + onRowSelect: handleRowSelect, + onSelectAll: handleSelectAll, + selectedRows, + translate, + }); + return ( <> @@ -311,6 +375,18 @@ export const DagRuns = () => { onStateChange={setTableURLState} previousCursor={previousCursor} /> + + + + {selectedRows.size} {translate("selected")} + + + + + + + + ); }; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkDagRuns.ts b/airflow-core/src/airflow/ui/src/queries/useBulkDagRuns.ts new file mode 100644 index 0000000000000..5cb79075e0993 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useBulkDagRuns.ts @@ -0,0 +1,200 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { useDagRunServiceGetDagRunsKey, useTaskInstanceServiceGetTaskInstancesKey } from "openapi/queries"; +import { DagRunService } from "openapi/requests/services.gen"; +import type { DAGRunPatchStates, DAGRunResponse } from "openapi/requests/types.gen"; +import { toaster } from "src/components/ui"; + +// NOTE: Until a real bulk Dag Runs API endpoint exists (analogous to +// `bulkTaskInstances` / `bulkVariables`), the actions below fan out to one +// request per Dag Run via Promise.allSettled. This means up to N (or 2N for +// clear-with-note) concurrent writes when the user selects N runs and partial +// failures are surfaced after the fact. + +type Props = { + readonly clearSelections: VoidFunction; + readonly onSuccessConfirm: VoidFunction; +}; + +export type BulkClearDagRunsOptions = { + note: string | null; + onlyFailed: boolean; + onlyNew: boolean; + runOnLatestVersion: boolean; +}; + +type BulkMarkOptions = { + note: string | null; + state: DAGRunPatchStates; +}; + +type ToasterKey = "toaster.bulkClear" | "toaster.bulkDelete" | "toaster.bulkUpdate"; + +export const useBulkDagRuns = ({ clearSelections, onSuccessConfirm }: Props) => { + const queryClient = useQueryClient(); + const [error, setError] = useState(undefined); + const [isPending, setIsPending] = useState(false); + const { t: translate } = useTranslation(["common", "dags"]); + + const invalidateQueries = async () => { + await Promise.all([ + queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }), + queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }), + ]); + }; + + const finalize = ( + dagRuns: Array, + results: Array>, + toasterKey: ToasterKey, + ) => { + const successDagRuns = dagRuns.filter((_, index) => results[index]?.status === "fulfilled"); + const firstRejection = results.find( + (result): result is PromiseRejectedResult => result.status === "rejected", + ); + + if (firstRejection) { + setError(firstRejection.reason); + } else { + setError(undefined); + } + + if (successDagRuns.length > 0) { + toaster.create({ + description: translate(`${toasterKey}.success.description`, { + count: successDagRuns.length, + keys: successDagRuns.map((dr) => dr.dag_run_id).join(", "), + resourceName: translate("dagRun_other"), + }), + title: translate(`${toasterKey}.success.title`, { + resourceName: translate("dagRun_other"), + }), + type: "success", + }); + } + + if (!firstRejection) { + clearSelections(); + onSuccessConfirm(); + } + }; + + const bulkClear = async (dagRuns: Array, options: BulkClearDagRunsOptions) => { + setError(undefined); + setIsPending(true); + + try { + const clearResults = await Promise.allSettled( + dagRuns.map((dagRun) => + DagRunService.clearDagRun({ + dagId: dagRun.dag_id, + dagRunId: dagRun.dag_run_id, + requestBody: { + dry_run: false, + only_failed: options.onlyFailed, + only_new: options.onlyNew, + run_on_latest_version: options.runOnLatestVersion, + }, + }), + ), + ); + + // Only patch notes for runs whose clear succeeded. Patch failures here + // are intentionally swallowed so a flaky note write does not undo the + // visible "successful clear" outcome; we still report the original + // clear-side rejections via finalize(). + if (options.note !== null) { + const toPatch = dagRuns.filter((_, index) => clearResults[index]?.status === "fulfilled"); + + if (toPatch.length > 0) { + await Promise.allSettled( + toPatch.map((dagRun) => + DagRunService.patchDagRun({ + dagId: dagRun.dag_id, + dagRunId: dagRun.dag_run_id, + requestBody: { note: options.note }, + updateMask: ["note"], + }), + ), + ); + } + } + + await invalidateQueries(); + finalize(dagRuns, clearResults, "toaster.bulkClear"); + } catch (_error) { + setError(_error); + } + setIsPending(false); + }; + + const bulkDelete = async (dagRuns: Array) => { + setError(undefined); + setIsPending(true); + + try { + const results = await Promise.allSettled( + dagRuns.map((dagRun) => + DagRunService.deleteDagRun({ + dagId: dagRun.dag_id, + dagRunId: dagRun.dag_run_id, + }), + ), + ); + + await invalidateQueries(); + finalize(dagRuns, results, "toaster.bulkDelete"); + } catch (_error) { + setError(_error); + } + setIsPending(false); + }; + + const bulkMarkAs = async (dagRuns: Array, options: BulkMarkOptions) => { + setError(undefined); + setIsPending(true); + + const updateMask = options.note === null ? ["state"] : ["state", "note"]; + + try { + const results = await Promise.allSettled( + dagRuns.map((dagRun) => + DagRunService.patchDagRun({ + dagId: dagRun.dag_id, + dagRunId: dagRun.dag_run_id, + requestBody: { note: options.note, state: options.state }, + updateMask, + }), + ), + ); + + await invalidateQueries(); + finalize(dagRuns, results, "toaster.bulkUpdate"); + } catch (_error) { + setError(_error); + } + setIsPending(false); + }; + + return { bulkClear, bulkDelete, bulkMarkAs, error, isPending, setError }; +}; From 67037eb31d9ef2c71111151ae8abe24ed70e9fc7 Mon Sep 17 00:00:00 2001 From: Yuseok Jo Date: Fri, 8 May 2026 21:39:36 +0900 Subject: [PATCH 02/11] Fix CI: apply prettier formatting --- airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx index 3d1a2e853cb91..2b359dbb552f2 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx @@ -104,9 +104,7 @@ const selectRow = async (runText: string) => { // matches the label regex, which is independent of locale state (the regex // matches the i18n key under tests, the translated string in production). const findBulkActionButton = (label: RegExp) => - screen - .getAllByRole("button", { name: label }) - .find((btn) => label.test(btn.textContent)); + screen.getAllByRole("button", { name: label }).find((btn) => label.test(btn.textContent)); describe("DagRuns bulk delete", () => { it("fires one DELETE per selected run and closes the dialog on success", async () => { From 7fb0e99b07c67933eed505e3b3fefe4941c87e3e Mon Sep 17 00:00:00 2001 From: Yuseok Jo Date: Fri, 8 May 2026 21:46:40 +0900 Subject: [PATCH 03/11] Move DagRuns page files into pages/DagRuns/ subdir --- .../{ => DagRuns}/BulkClearDagRunsButton.tsx | 0 .../{ => DagRuns}/BulkDeleteDagRunsButton.tsx | 0 .../{ => DagRuns}/BulkMarkDagRunsAsButton.tsx | 0 .../src/pages/{ => DagRuns}/DagRuns.test.tsx | 0 .../ui/src/pages/{ => DagRuns}/DagRuns.tsx | 9 +++++---- .../pages/{ => DagRuns}/DagRunsFilters.tsx | 0 .../src/airflow/ui/src/pages/DagRuns/index.ts | 19 +++++++++++++++++++ 7 files changed, 24 insertions(+), 4 deletions(-) rename airflow-core/src/airflow/ui/src/pages/{ => DagRuns}/BulkClearDagRunsButton.tsx (100%) rename airflow-core/src/airflow/ui/src/pages/{ => DagRuns}/BulkDeleteDagRunsButton.tsx (100%) rename airflow-core/src/airflow/ui/src/pages/{ => DagRuns}/BulkMarkDagRunsAsButton.tsx (100%) rename airflow-core/src/airflow/ui/src/pages/{ => DagRuns}/DagRuns.test.tsx (100%) rename airflow-core/src/airflow/ui/src/pages/{ => DagRuns}/DagRuns.tsx (98%) rename airflow-core/src/airflow/ui/src/pages/{ => DagRuns}/DagRunsFilters.tsx (100%) create mode 100644 airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts diff --git a/airflow-core/src/airflow/ui/src/pages/BulkClearDagRunsButton.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx similarity index 100% rename from airflow-core/src/airflow/ui/src/pages/BulkClearDagRunsButton.tsx rename to airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx diff --git a/airflow-core/src/airflow/ui/src/pages/BulkDeleteDagRunsButton.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx similarity index 100% rename from airflow-core/src/airflow/ui/src/pages/BulkDeleteDagRunsButton.tsx rename to airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx diff --git a/airflow-core/src/airflow/ui/src/pages/BulkMarkDagRunsAsButton.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkMarkDagRunsAsButton.tsx similarity index 100% rename from airflow-core/src/airflow/ui/src/pages/BulkMarkDagRunsAsButton.tsx rename to airflow-core/src/airflow/ui/src/pages/DagRuns/BulkMarkDagRunsAsButton.tsx diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.test.tsx similarity index 100% rename from airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.test.tsx diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx similarity index 98% rename from airflow-core/src/airflow/ui/src/pages/DagRuns.tsx rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx index 9575fd560e0f6..d181a5360bd66 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx @@ -41,11 +41,12 @@ import { ActionBar } from "src/components/ui/ActionBar"; import { Checkbox } from "src/components/ui/Checkbox"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { useAdvancedSearchArg } from "src/hooks/useAdvancedSearch"; -import BulkClearDagRunsButton from "src/pages/BulkClearDagRunsButton"; -import BulkDeleteDagRunsButton from "src/pages/BulkDeleteDagRunsButton"; -import BulkMarkDagRunsAsButton from "src/pages/BulkMarkDagRunsAsButton"; -import { DagRunsFilters } from "src/pages/DagRunsFilters"; import DeleteRunButton from "src/pages/DeleteRunButton"; + +import BulkClearDagRunsButton from "./BulkClearDagRunsButton"; +import BulkDeleteDagRunsButton from "./BulkDeleteDagRunsButton"; +import BulkMarkDagRunsAsButton from "./BulkMarkDagRunsAsButton"; +import { DagRunsFilters } from "./DagRunsFilters"; import { renderDuration, useAutoRefresh, isStatePending } from "src/utils"; type DagRunRow = { row: { original: DAGRunResponse } }; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRunsFilters.tsx similarity index 100% rename from airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRunsFilters.tsx diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts b/airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts new file mode 100644 index 0000000000000..26ed3a4da898f --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts @@ -0,0 +1,19 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +export { DagRuns } from "./DagRuns"; From 7e1ca4e3ade66489303acdf990852443e4658f4e Mon Sep 17 00:00:00 2001 From: Yuseok Jo Date: Sat, 9 May 2026 22:47:33 +0900 Subject: [PATCH 04/11] Add bulk Dag Runs API endpoints (PATCH and POST /clear) --- .../core_api/datamodels/dag_run.py | 43 ++ .../openapi/v2-rest-api-generated.yaml | 345 ++++++++++++- .../core_api/routes/public/dag_run.py | 76 ++- .../core_api/services/public/dag_run.py | 461 +++++++++++++++++- .../airflow/ui/openapi-gen/queries/common.ts | 4 +- .../airflow/ui/openapi-gen/queries/queries.ts | 80 ++- .../ui/openapi-gen/requests/schemas.gen.ts | 253 ++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 87 +++- .../ui/openapi-gen/requests/types.gen.ts | 173 ++++++- .../pages/DagRuns/BulkClearDagRunsButton.tsx | 6 +- .../ui/src/pages/DagRuns/DagRuns.test.tsx | 139 +++--- .../airflow/ui/src/pages/DagRuns/DagRuns.tsx | 2 +- .../airflow/ui/src/queries/useBulkDagRuns.ts | 176 +++---- .../core_api/routes/public/test_dag_run.py | 407 ++++++++++++++++ 14 files changed, 2042 insertions(+), 210 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 481cc0387fed1..d413af69a0b6f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -73,6 +73,49 @@ def validate_model(cls, data: Any) -> Any: return data +class BulkDagRunBody(DAGRunPatchBody, StrictBaseModel): + """Request body for bulk update and delete Dag runs.""" + + dag_run_id: str + dag_id: str | None = None + + +class DagRunIdentifier(StrictBaseModel): + """Identifier for a Dag run targeted by a bulk operation.""" + + dag_run_id: str + dag_id: str | None = None + + +class BulkClearDagRunsBody(StrictBaseModel): + """Request body for the bulk clear Dag runs endpoint.""" + + runs: list[DagRunIdentifier] + only_failed: bool = False + only_new: bool = Field( + default=False, + description="Only queue newly added tasks in the latest Dag version without clearing existing tasks.", + ) + run_on_latest_version: bool = Field( + default=False, + description="(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run.", + ) + dry_run: bool = True + note: str | None = Field( + default=None, + max_length=1000, + description="Optional note applied to every Dag Run that is successfully cleared. Ignored on dry runs.", + ) + + @model_validator(mode="before") + @classmethod + def validate_model(cls, data: Any) -> Any: + """Validate clear Dag runs form.""" + if data.get("only_new") and data.get("only_failed"): + raise ValueError("only_new and only_failed are mutually exclusive") + return data + + class DAGRunResponse(BaseModel): """Dag Run serializer for responses.""" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5ed96855c24ac..4a1041f0e034c 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2107,12 +2107,25 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear: - post: + /api/v2/dags/{dag_id}/dagRuns: + patch: tags: - DagRun - summary: Clear Dag Run - operationId: clear_dag_run + summary: Bulk Dag Runs + description: 'Bulk update and delete Dag runs. + + + A single request handles many Dag runs in one transaction. Per-entity + + failures are reported via ``BulkResponse`` so that a partial failure does + + not abort the whole batch. + + + The path''s ``dag_id`` may be ``~`` for cross-DAG operations; in that case + + each entity must specify its own ``dag_id`` in the body.' + operationId: bulk_dag_runs security: - OAuth2PasswordBearer: [] - HTTPBearer: [] @@ -2123,28 +2136,19 @@ paths: schema: type: string title: Dag Id - - name: dag_run_id - in: path - required: true - schema: - type: string - title: Dag Run Id requestBody: required: true content: application/json: schema: - $ref: '#/components/schemas/DAGRunClearBody' + $ref: '#/components/schemas/BulkBody_BulkDagRunBody_' responses: '200': description: Successful Response content: application/json: schema: - anyOf: - - $ref: '#/components/schemas/ClearTaskInstanceCollectionResponse' - - $ref: '#/components/schemas/DAGRunResponse' - title: Response Clear Dag Run + $ref: '#/components/schemas/BulkResponse' '401': content: application/json: @@ -2157,19 +2161,12 @@ paths: schema: $ref: '#/components/schemas/HTTPExceptionResponse' description: Forbidden - '404': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Not Found '422': description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /api/v2/dags/{dag_id}/dagRuns: get: tags: - DagRun @@ -2792,6 +2789,134 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/dagRuns/clear: + post: + tags: + - DagRun + summary: Post Clear Dag Runs + description: 'Clear multiple Dag runs in a single request. + + + Mirrors the per-DAG bulk pattern of ``POST /dags/{dag_id}/clearTaskInstances``: + + each ``(dag_id, dag_run_id)`` in ``runs`` is processed in the same transaction + + and per-entry failures are reported via ``BulkActionResponse.errors``. + + + The path''s ``dag_id`` may be ``~`` for cross-DAG clears; otherwise each entry + + must reference the same ``dag_id`` as the path.' + operationId: post_clear_dag_runs + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/BulkClearDagRunsBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/BulkActionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear: + post: + tags: + - DagRun + summary: Clear Dag Run + operationId: clear_dag_run + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/DAGRunClearBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + anyOf: + - $ref: '#/components/schemas/ClearTaskInstanceCollectionResponse' + - $ref: '#/components/schemas/DAGRunResponse' + title: Response Clear Dag Run + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait: get: tags: @@ -10775,6 +10900,21 @@ components: This structure helps users understand which key actions succeeded and which failed.' + BulkBody_BulkDagRunBody_: + properties: + actions: + items: + oneOf: + - $ref: '#/components/schemas/BulkCreateAction_BulkDagRunBody_' + - $ref: '#/components/schemas/BulkUpdateAction_BulkDagRunBody_' + - $ref: '#/components/schemas/BulkDeleteAction_BulkDagRunBody_' + type: array + title: Actions + additionalProperties: false + type: object + required: + - actions + title: BulkBody[BulkDagRunBody] BulkBody_BulkTaskInstanceBody_: properties: actions: @@ -10835,6 +10975,69 @@ components: required: - actions title: BulkBody[VariableBody] + BulkClearDagRunsBody: + properties: + runs: + items: + $ref: '#/components/schemas/DagRunIdentifier' + type: array + title: Runs + only_failed: + type: boolean + title: Only Failed + default: false + only_new: + type: boolean + title: Only New + description: Only queue newly added tasks in the latest Dag version without + clearing existing tasks. + default: false + run_on_latest_version: + type: boolean + title: Run On Latest Version + description: (Experimental) Run on the latest bundle version of the Dag + after clearing the Dag Run. + default: false + dry_run: + type: boolean + title: Dry Run + default: true + note: + anyOf: + - type: string + maxLength: 1000 + - type: 'null' + title: Note + description: Optional note applied to every Dag Run that is successfully + cleared. Ignored on dry runs. + additionalProperties: false + type: object + required: + - runs + title: BulkClearDagRunsBody + description: Request body for the bulk clear Dag runs endpoint. + BulkCreateAction_BulkDagRunBody_: + properties: + action: + type: string + const: create + title: Action + description: The action to be performed on the entities. + entities: + items: + $ref: '#/components/schemas/BulkDagRunBody' + type: array + title: Entities + description: A list of entities to be created. + action_on_existence: + $ref: '#/components/schemas/BulkActionOnExistence' + default: fail + additionalProperties: false + type: object + required: + - action + - entities + title: BulkCreateAction[BulkDagRunBody] BulkCreateAction_BulkTaskInstanceBody_: properties: action: @@ -10923,6 +11126,56 @@ components: - action - entities title: BulkCreateAction[VariableBody] + BulkDagRunBody: + properties: + state: + anyOf: + - $ref: '#/components/schemas/DAGRunPatchStates' + - type: 'null' + note: + anyOf: + - type: string + maxLength: 1000 + - type: 'null' + title: Note + dag_run_id: + type: string + title: Dag Run Id + dag_id: + anyOf: + - type: string + - type: 'null' + title: Dag Id + additionalProperties: false + type: object + required: + - dag_run_id + title: BulkDagRunBody + description: Request body for bulk update and delete Dag runs. + BulkDeleteAction_BulkDagRunBody_: + properties: + action: + type: string + const: delete + title: Action + description: The action to be performed on the entities. + entities: + items: + anyOf: + - type: string + - $ref: '#/components/schemas/BulkDagRunBody' + type: array + title: Entities + description: A list of entity id/key or entity objects to be deleted. + action_on_non_existence: + $ref: '#/components/schemas/BulkActionNotOnExistence' + default: fail + additionalProperties: false + type: object + required: + - action + - entities + title: BulkDeleteAction[BulkDagRunBody] BulkDeleteAction_BulkTaskInstanceBody_: properties: action: @@ -11104,6 +11357,38 @@ components: - task_id title: BulkTaskInstanceBody description: Request body for bulk update, and delete task instances. + BulkUpdateAction_BulkDagRunBody_: + properties: + action: + type: string + const: update + title: Action + description: The action to be performed on the entities. + entities: + items: + $ref: '#/components/schemas/BulkDagRunBody' + type: array + title: Entities + description: A list of entities to be updated. + update_mask: + anyOf: + - items: + type: string + type: array + - type: 'null' + title: Update Mask + description: A list of field names to update for each entity.Only these + fields will be applied from the request body to the database model.Any + extra fields provided will be ignored. + action_on_non_existence: + $ref: '#/components/schemas/BulkActionNotOnExistence' + default: fail + additionalProperties: false + type: object + required: + - action + - entities + title: BulkUpdateAction[BulkDagRunBody] BulkUpdateAction_BulkTaskInstanceBody_: properties: action: @@ -12604,6 +12889,22 @@ components: - partition_key title: DagRunAssetReference description: DagRun serializer for asset responses. + DagRunIdentifier: + properties: + dag_run_id: + type: string + title: Dag Run Id + dag_id: + anyOf: + - type: string + - type: 'null' + title: Dag Id + additionalProperties: false + type: object + required: + - dag_run_id + title: DagRunIdentifier + description: Identifier for a Dag run targeted by a bulk operation. DagRunState: type: string enum: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index a40676f8e186f..0e517d6cfaba8 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -76,7 +76,14 @@ from airflow.api_fastapi.common.types import Mimetype from airflow.api_fastapi.core_api.base import OrmClause from airflow.api_fastapi.core_api.datamodels.assets import AssetEventCollectionResponse +from airflow.api_fastapi.core_api.datamodels.common import ( + BulkActionResponse, + BulkBody, + BulkResponse, +) from airflow.api_fastapi.core_api.datamodels.dag_run import ( + BulkClearDagRunsBody, + BulkDagRunBody, DAGRunClearBody, DAGRunCollectionResponse, DAGRunPatchBody, @@ -97,7 +104,11 @@ requires_access_asset, requires_access_dag, ) -from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter +from airflow.api_fastapi.core_api.services.public.dag_run import ( + BulkDagRunService, + DagRunWaiter, + bulk_clear_dag_runs, +) from airflow.api_fastapi.logging.decorators import action_logging from airflow.listeners.listener import get_listener_manager from airflow.models import DagModel, DagRun @@ -285,6 +296,69 @@ def get_upstream_asset_events( ) +@dag_run_router.patch( + "", + dependencies=[ + Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), + Depends(action_logging()), + ], +) +def bulk_dag_runs( + request: BulkBody[BulkDagRunBody], + session: SessionDep, + dag_id: str, + dag_bag: DagBagDep, + user: GetUserDep, +) -> BulkResponse: + """ + Bulk update and delete Dag runs. + + A single request handles many Dag runs in one transaction. Per-entity + failures are reported via ``BulkResponse`` so that a partial failure does + not abort the whole batch. + + The path's ``dag_id`` may be ``~`` for cross-DAG operations; in that case + each entity must specify its own ``dag_id`` in the body. + """ + return BulkDagRunService( + session=session, request=request, dag_id=dag_id, dag_bag=dag_bag, user=user + ).handle_request() + + +@dag_run_router.post( + "/clear", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[ + Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), + Depends(action_logging()), + ], +) +def post_clear_dag_runs( + dag_id: str, + body: BulkClearDagRunsBody, + dag_bag: DagBagDep, + session: SessionDep, + user: GetUserDep, +) -> BulkActionResponse: + """ + Clear multiple Dag runs in a single request. + + Mirrors the per-DAG bulk pattern of ``POST /dags/{dag_id}/clearTaskInstances``: + each ``(dag_id, dag_run_id)`` in ``runs`` is processed in the same transaction + and per-entry failures are reported via ``BulkActionResponse.errors``. + + The path's ``dag_id`` may be ``~`` for cross-DAG clears; otherwise each entry + must reference the same ``dag_id`` as the path. + """ + return bulk_clear_dag_runs( + body=body, + dag_id=dag_id, + dag_bag=dag_bag, + session=session, + user=user, + ) + + @dag_run_router.post( "/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py index e7d7cb98c939f..db13f5814adeb 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py @@ -20,12 +20,41 @@ import asyncio import itertools import json +import logging import operator -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal import attrs +from fastapi import HTTPException, status from sqlalchemy import select +from sqlalchemy.orm import joinedload +from sqlalchemy.orm.session import Session +from airflow.api.common.mark_tasks import ( + set_dag_run_state_to_failed, + set_dag_run_state_to_queued, + set_dag_run_state_to_success, +) +from airflow.api_fastapi.app import get_auth_manager +from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity, DagDetails +from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run +from airflow.api_fastapi.core_api.datamodels.common import ( + BulkActionNotOnExistence, + BulkActionResponse, + BulkBody, + BulkCreateAction, + BulkDeleteAction, + BulkUpdateAction, +) +from airflow.api_fastapi.core_api.datamodels.dag_run import ( + BulkClearDagRunsBody, + BulkDagRunBody, + DAGRunPatchStates, +) +from airflow.api_fastapi.core_api.security import GetUserDep +from airflow.api_fastapi.core_api.services.public.common import BulkService +from airflow.listeners.listener import get_listener_manager +from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun from airflow.models.xcom import XCOM_RETURN_KEY, XComModel from airflow.utils.session import create_session_async @@ -34,6 +63,14 @@ if TYPE_CHECKING: from collections.abc import AsyncGenerator, Iterator + from airflow.serialization.definitions.dag import SerializedDAG + + +AuthMethod = Literal["GET", "PUT", "POST", "DELETE"] + + +log = logging.getLogger(__name__) + @attrs.define class DagRunWaiter: @@ -86,3 +123,425 @@ async def wait(self) -> AsyncGenerator[str, None]: await asyncio.sleep(self.interval) yield await self._serialize_response(dag_run := await self._get_dag_run()) yield "\n" + + +def _format_dag_run_key(dag_id: str, dag_run_id: str) -> str: + return f"{dag_id}.{dag_run_id}" + + +def _authorize_dag_run( + *, + session: Session, + user, + dag_id: str, + method: AuthMethod, + cache: dict[str, bool], +) -> bool: + """ + Return whether ``user`` may perform ``method`` on Dag runs of ``dag_id``. + + The result is memoised in ``cache`` so a bulk request that touches many + runs of the same Dag only pays for one ``is_authorized_dag`` call per Dag. + """ + if dag_id not in cache: + team_name = DagModel.get_team_name(dag_id, session=session) + cache[dag_id] = get_auth_manager().is_authorized_dag( + method=method, + access_entity=DagAccessEntity.RUN, + details=DagDetails(id=dag_id, team_name=team_name), + user=user, + ) + return cache[dag_id] + + +def _apply_state_change( + dag_run: DagRun, + new_state: DAGRunPatchStates, + dag: SerializedDAG, + session: Session, +) -> None: + """Apply ``new_state`` to ``dag_run`` and fire the matching listener hook.""" + if new_state == DAGRunPatchStates.SUCCESS: + set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + try: + get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="") + except Exception: + log.exception("error calling listener") + elif new_state == DAGRunPatchStates.QUEUED: + # Notification on queued is intentionally skipped; the scheduler emits + # the RUNNING notification instead. + set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + elif new_state == DAGRunPatchStates.FAILED: + set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + try: + get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="") + except Exception: + log.exception("error calling listener") + + +def _apply_note(dag_run: DagRun, note: str | None, user_id: str) -> None: + if dag_run.dag_run_note is None: + dag_run.note = (note, user_id) + else: + dag_run.dag_run_note.content = note + dag_run.dag_run_note.user_id = user_id + + +def _validate_no_wildcard_in_resolved( + *, + dag_id: str, + dag_run_id: str, + results: BulkActionResponse, +) -> bool: + if dag_id == "~" or dag_run_id == "~": + results.errors.append( + { + "error": ( + "When the path uses the ``~`` wildcard, ``dag_id`` and ``dag_run_id`` must be " + "specified in the body for each entity." + ), + "status_code": status.HTTP_400_BAD_REQUEST, + } + ) + return False + return True + + +def _validate_path_dag_id_match( + *, + path_dag_id: str, + entity_dag_id: str | None, + dag_run_id: str, + results: BulkActionResponse, +) -> bool: + if path_dag_id != "~" and entity_dag_id is not None and entity_dag_id != path_dag_id: + results.errors.append( + { + "error": ( + f"Entity dag_id '{entity_dag_id}' does not match path dag_id '{path_dag_id}'. " + "Use ``~`` in the path for cross-DAG bulk operations." + ), + "status_code": status.HTTP_400_BAD_REQUEST, + "dag_id": entity_dag_id, + "dag_run_id": dag_run_id, + } + ) + return False + return True + + +class BulkDagRunService(BulkService[BulkDagRunBody]): + """Service for handling bulk operations on Dag runs.""" + + def __init__( + self, + session: Session, + request: BulkBody[BulkDagRunBody], + dag_id: str, + dag_bag: DagBagDep, + user: GetUserDep, + ): + super().__init__(session, request) + self.dag_id = dag_id + self.dag_bag = dag_bag + self.user = user + + def _resolve_identifiers(self, entity: str | BulkDagRunBody) -> tuple[str, str]: + """Return ``(dag_id, dag_run_id)`` for an entity, falling back to the path's ``dag_id``.""" + if isinstance(entity, str): + return self.dag_id, entity + dag_id = entity.dag_id or self.dag_id + return dag_id, entity.dag_run_id + + def _check_dag_authorization( + self, + dag_id: str, + method: AuthMethod, + action_name: str, + results: BulkActionResponse, + cache: dict[str, bool], + ) -> bool: + if not _authorize_dag_run( + session=self.session, + user=self.user, + dag_id=dag_id, + method=method, + cache=cache, + ): + results.errors.append( + { + "error": f"User is not authorized to {action_name} Dag runs for DAG '{dag_id}'", + "status_code": status.HTTP_403_FORBIDDEN, + } + ) + return False + return True + + def _fetch_dag_runs( + self, + keys: set[tuple[str, str]], + ) -> tuple[dict[tuple[str, str], DagRun], set[tuple[str, str]]]: + if not keys: + return {}, set() + keys_list = list(keys) + dag_runs = self.session.scalars( + select(DagRun) + .options(joinedload(DagRun.dag_model)) + .where( + DagRun.dag_id.in_({k[0] for k in keys_list}), + DagRun.run_id.in_({k[1] for k in keys_list}), + ) + ).all() + found = {(dr.dag_id, dr.run_id): dr for dr in dag_runs if (dr.dag_id, dr.run_id) in keys} + not_found = keys - set(found.keys()) + return found, not_found + + def handle_bulk_create( + self, action: BulkCreateAction[BulkDagRunBody], results: BulkActionResponse + ) -> None: + results.errors.append( + { + "error": "Dag runs bulk create is not supported via this endpoint; use the trigger Dag run endpoint instead.", + "status_code": status.HTTP_405_METHOD_NOT_ALLOWED, + } + ) + + def handle_bulk_update( + self, action: BulkUpdateAction[BulkDagRunBody], results: BulkActionResponse + ) -> None: + """Bulk update Dag runs (state and/or note).""" + update_mask = action.update_mask + auth_cache: dict[str, bool] = {} + keys: set[tuple[str, str]] = set() + entity_map: dict[tuple[str, str], BulkDagRunBody] = {} + + for entity in action.entities: + if isinstance(entity, str): + results.errors.append( + { + "error": "Bulk update requires entities as objects, not strings.", + "status_code": status.HTTP_400_BAD_REQUEST, + } + ) + continue + dag_id, dag_run_id = self._resolve_identifiers(entity) + if not _validate_no_wildcard_in_resolved(dag_id=dag_id, dag_run_id=dag_run_id, results=results): + continue + if not _validate_path_dag_id_match( + path_dag_id=self.dag_id, + entity_dag_id=entity.dag_id, + dag_run_id=dag_run_id, + results=results, + ): + continue + if not self._check_dag_authorization(dag_id, "PUT", action.action.value, results, auth_cache): + continue + keys.add((dag_id, dag_run_id)) + entity_map[(dag_id, dag_run_id)] = entity + + try: + found, not_found = self._fetch_dag_runs(keys) + + if action.action_on_non_existence == BulkActionNotOnExistence.FAIL and not_found: + missing = [{"dag_id": d, "dag_run_id": r} for d, r in not_found] + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"The Dag runs with these identifiers were not found: {missing}", + ) + + for key, dag_run in found.items(): + entity = entity_map[key] + fields_to_update = entity.model_fields_set + if update_mask: + fields_to_update = fields_to_update.intersection(update_mask) + fields_to_update = fields_to_update - {"dag_id", "dag_run_id"} + if not fields_to_update: + continue + + try: + with self.session.begin_nested(): + dag = get_dag_for_run(self.dag_bag, dag_run, session=self.session) + if "state" in fields_to_update and entity.state is not None: + _apply_state_change(dag_run, entity.state, dag, self.session) + if "note" in fields_to_update: + refreshed = self.session.get(DagRun, dag_run.id) + if refreshed is not None: + _apply_note(refreshed, entity.note, self.user.get_id()) + except HTTPException as exc: + results.errors.append( + { + "error": str(exc.detail), + "status_code": exc.status_code, + "dag_id": key[0], + "dag_run_id": key[1], + } + ) + continue + except Exception as exc: + results.errors.append( + { + "error": str(exc), + "status_code": status.HTTP_500_INTERNAL_SERVER_ERROR, + "dag_id": key[0], + "dag_run_id": key[1], + } + ) + continue + + results.success.append(_format_dag_run_key(*key)) + except HTTPException as e: + results.errors.append({"error": f"{e.detail}", "status_code": e.status_code}) + + def handle_bulk_delete( + self, action: BulkDeleteAction[BulkDagRunBody], results: BulkActionResponse + ) -> None: + """Bulk delete Dag runs.""" + auth_cache: dict[str, bool] = {} + keys: set[tuple[str, str]] = set() + + for entity in action.entities: + dag_id, dag_run_id = self._resolve_identifiers(entity) + entity_dag_id = entity.dag_id if isinstance(entity, BulkDagRunBody) else None + if not _validate_no_wildcard_in_resolved(dag_id=dag_id, dag_run_id=dag_run_id, results=results): + continue + if not _validate_path_dag_id_match( + path_dag_id=self.dag_id, + entity_dag_id=entity_dag_id, + dag_run_id=dag_run_id, + results=results, + ): + continue + if not self._check_dag_authorization(dag_id, "DELETE", action.action.value, results, auth_cache): + continue + keys.add((dag_id, dag_run_id)) + + try: + found, not_found = self._fetch_dag_runs(keys) + + if action.action_on_non_existence == BulkActionNotOnExistence.FAIL and not_found: + missing = [{"dag_id": d, "dag_run_id": r} for d, r in not_found] + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"The Dag runs with these identifiers were not found: {missing}", + ) + + deletable_states = {s.value for s in DAGRunPatchStates} + for key, dag_run in found.items(): + if dag_run.state not in deletable_states: + results.errors.append( + { + "error": ( + f"The DagRun with dag_id: `{dag_run.dag_id}` and run_id: `{dag_run.run_id}` " + f"cannot be deleted in {dag_run.state} state" + ), + "status_code": status.HTTP_409_CONFLICT, + "dag_id": dag_run.dag_id, + "dag_run_id": dag_run.run_id, + } + ) + continue + self.session.delete(dag_run) + results.success.append(_format_dag_run_key(*key)) + except HTTPException as e: + results.errors.append({"error": f"{e.detail}", "status_code": e.status_code}) + + +def bulk_clear_dag_runs( + body: BulkClearDagRunsBody, + dag_id: str, + dag_bag: DagBagDep, + session: Session, + user: GetUserDep, +) -> BulkActionResponse: + """ + Run ``dag.clear()`` for each ``(dag_id, dag_run_id)`` in ``body.runs`` within a single transaction. + + Returns ``BulkActionResponse`` with per-run success keys and per-run failure entries so that a partial + failure does not abort the entire batch. + """ + results = BulkActionResponse() + auth_cache: dict[str, bool] = {} + + for identifier in body.runs: + run_dag_id = identifier.dag_id or dag_id + run_id = identifier.dag_run_id + + if not _validate_no_wildcard_in_resolved(dag_id=run_dag_id, dag_run_id=run_id, results=results): + continue + if not _validate_path_dag_id_match( + path_dag_id=dag_id, + entity_dag_id=identifier.dag_id, + dag_run_id=run_id, + results=results, + ): + continue + + if not _authorize_dag_run( + session=session, user=user, dag_id=run_dag_id, method="PUT", cache=auth_cache + ): + results.errors.append( + { + "error": f"User is not authorized to clear Dag runs for DAG '{run_dag_id}'", + "status_code": status.HTTP_403_FORBIDDEN, + "dag_id": run_dag_id, + "dag_run_id": run_id, + } + ) + continue + + dag_run = session.scalar( + select(DagRun) + .options(joinedload(DagRun.dag_model)) + .where(DagRun.dag_id == run_dag_id, DagRun.run_id == run_id) + ) + if dag_run is None: + results.errors.append( + { + "error": f"Dag run not found for dag_id '{run_dag_id}', dag_run_id '{run_id}'", + "status_code": status.HTTP_404_NOT_FOUND, + "dag_id": run_dag_id, + "dag_run_id": run_id, + } + ) + continue + + try: + with session.begin_nested(): + dag = get_dag_for_run(dag_bag, dag_run, session=session) + dag.clear( + run_id=run_id, + task_ids=None, + only_failed=body.only_failed, + only_new=body.only_new, + run_on_latest_version=body.run_on_latest_version, + dry_run=body.dry_run, + session=session, + ) + if body.note is not None and not body.dry_run: + refreshed = session.get(DagRun, dag_run.id) + if refreshed is not None: + _apply_note(refreshed, body.note, user.get_id()) + except HTTPException as exc: + results.errors.append( + { + "error": str(exc.detail), + "status_code": exc.status_code, + "dag_id": run_dag_id, + "dag_run_id": run_id, + } + ) + continue + except Exception as exc: + results.errors.append( + { + "error": str(exc), + "status_code": status.HTTP_500_INTERNAL_SERVER_ERROR, + "dag_id": run_dag_id, + "dag_run_id": run_id, + } + ) + continue + + results.success.append(_format_dag_run_key(run_dag_id, run_id)) + + return results diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index e0976d31b7276..dd632ead7ed71 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -987,8 +987,9 @@ export type BackfillServiceCreateBackfillDryRunMutationResult = Awaited>; export type ConnectionServiceTestConnectionMutationResult = Awaited>; export type ConnectionServiceCreateDefaultConnectionsMutationResult = Awaited>; -export type DagRunServiceClearDagRunMutationResult = Awaited>; export type DagRunServiceTriggerDagRunMutationResult = Awaited>; +export type DagRunServicePostClearDagRunsMutationResult = Awaited>; +export type DagRunServiceClearDagRunMutationResult = Awaited>; export type DagRunServiceGetListDagRunsBatchMutationResult = Awaited>; export type DagServiceFavoriteDagMutationResult = Awaited>; export type DagServiceUnfavoriteDagMutationResult = Awaited>; @@ -1005,6 +1006,7 @@ export type DagParsingServiceReparseDagFileMutationResult = Awaited>; export type ConnectionServiceBulkConnectionsMutationResult = Awaited>; export type DagRunServicePatchDagRunMutationResult = Awaited>; +export type DagRunServiceBulkDagRunsMutationResult = Awaited>; export type DagServicePatchDagsMutationResult = Awaited>; export type DagServicePatchDagMutationResult = Awaited>; export type TaskInstanceServicePatchTaskInstanceMutationResult = Awaited>; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 52e4776d510a5..7890db1bd3fee 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -2,7 +2,7 @@ import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from "@tanstack/react-query"; import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; -import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; +import { BackfillPostBody, BulkBody_BulkDagRunBody_, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, BulkClearDagRunsBody, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; import * as Common from "./common"; /** * Get Assets @@ -2067,6 +2067,45 @@ export const useConnectionServiceTestConnection = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: () => ConnectionService.createDefaultConnections() as unknown as Promise, ...options }); /** +* Trigger Dag Run +* Trigger a Dag. +* @param data The data for the request. +* @param data.dagId +* @param data.requestBody +* @returns DAGRunResponse Successful Response +* @throws ApiError +*/ +export const useDagRunServiceTriggerDagRun = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.triggerDagRun({ dagId, requestBody }) as unknown as Promise, ...options }); +/** +* Post Clear Dag Runs +* Clear multiple Dag runs in a single request. +* +* Mirrors the per-DAG bulk pattern of ``POST /dags/{dag_id}/clearTaskInstances``: +* each ``(dag_id, dag_run_id)`` in ``runs`` is processed in the same transaction +* and per-entry failures are reported via ``BulkActionResponse.errors``. +* +* The path's ``dag_id`` may be ``~`` for cross-DAG clears; otherwise each entry +* must reference the same ``dag_id`` as the path. +* @param data The data for the request. +* @param data.dagId +* @param data.requestBody +* @returns BulkActionResponse Successful Response +* @throws ApiError +*/ +export const useDagRunServicePostClearDagRuns = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.postClearDagRuns({ dagId, requestBody }) as unknown as Promise, ...options }); +/** * Clear Dag Run * @param data The data for the request. * @param data.dagId @@ -2085,22 +2124,6 @@ export const useDagRunServiceClearDagRun = ({ mutationFn: ({ dagId, dagRunId, requestBody }) => DagRunService.clearDagRun({ dagId, dagRunId, requestBody }) as unknown as Promise, ...options }); /** -* Trigger Dag Run -* Trigger a Dag. -* @param data The data for the request. -* @param data.dagId -* @param data.requestBody -* @returns DAGRunResponse Successful Response -* @throws ApiError -*/ -export const useDagRunServiceTriggerDagRun = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.triggerDagRun({ dagId, requestBody }) as unknown as Promise, ...options }); -/** * Get List Dag Runs Batch * Get a list of Dag Runs. * @param data The data for the request. @@ -2342,6 +2365,29 @@ export const useDagRunServicePatchDagRun = ({ mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) => DagRunService.patchDagRun({ dagId, dagRunId, requestBody, updateMask }) as unknown as Promise, ...options }); /** +* Bulk Dag Runs +* Bulk update and delete Dag runs. +* +* A single request handles many Dag runs in one transaction. Per-entity +* failures are reported via ``BulkResponse`` so that a partial failure does +* not abort the whole batch. +* +* The path's ``dag_id`` may be ``~`` for cross-DAG operations; in that case +* each entity must specify its own ``dag_id`` in the body. +* @param data The data for the request. +* @param data.dagId +* @param data.requestBody +* @returns BulkResponse Successful Response +* @throws ApiError +*/ +export const useDagRunServiceBulkDagRuns = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.bulkDagRuns({ dagId, requestBody }) as unknown as Promise, ...options }); +/** * Patch Dags * Patch multiple Dags. * diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index bc4c7179c2dbb..2fbcf6c0cccb5 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -620,6 +620,32 @@ The response includes a list of successful keys and any errors encountered durin This structure helps users understand which key actions succeeded and which failed.` } as const; +export const $BulkBody_BulkDagRunBody_ = { + properties: { + actions: { + items: { + oneOf: [ + { + '$ref': '#/components/schemas/BulkCreateAction_BulkDagRunBody_' + }, + { + '$ref': '#/components/schemas/BulkUpdateAction_BulkDagRunBody_' + }, + { + '$ref': '#/components/schemas/BulkDeleteAction_BulkDagRunBody_' + } + ] + }, + type: 'array', + title: 'Actions' + } + }, + additionalProperties: false, + type: 'object', + required: ['actions'], + title: 'BulkBody[BulkDagRunBody]' +} as const; + export const $BulkBody_BulkTaskInstanceBody_ = { properties: { actions: { @@ -724,6 +750,85 @@ export const $BulkBody_VariableBody_ = { title: 'BulkBody[VariableBody]' } as const; +export const $BulkClearDagRunsBody = { + properties: { + runs: { + items: { + '$ref': '#/components/schemas/DagRunIdentifier' + }, + type: 'array', + title: 'Runs' + }, + only_failed: { + type: 'boolean', + title: 'Only Failed', + default: false + }, + only_new: { + type: 'boolean', + title: 'Only New', + description: 'Only queue newly added tasks in the latest Dag version without clearing existing tasks.', + default: false + }, + run_on_latest_version: { + type: 'boolean', + title: 'Run On Latest Version', + description: '(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run.', + default: false + }, + dry_run: { + type: 'boolean', + title: 'Dry Run', + default: true + }, + note: { + anyOf: [ + { + type: 'string', + maxLength: 1000 + }, + { + type: 'null' + } + ], + title: 'Note', + description: 'Optional note applied to every Dag Run that is successfully cleared. Ignored on dry runs.' + } + }, + additionalProperties: false, + type: 'object', + required: ['runs'], + title: 'BulkClearDagRunsBody', + description: 'Request body for the bulk clear Dag runs endpoint.' +} as const; + +export const $BulkCreateAction_BulkDagRunBody_ = { + properties: { + action: { + type: 'string', + const: 'create', + title: 'Action', + description: 'The action to be performed on the entities.' + }, + entities: { + items: { + '$ref': '#/components/schemas/BulkDagRunBody' + }, + type: 'array', + title: 'Entities', + description: 'A list of entities to be created.' + }, + action_on_existence: { + '$ref': '#/components/schemas/BulkActionOnExistence', + default: 'fail' + } + }, + additionalProperties: false, + type: 'object', + required: ['action', 'entities'], + title: 'BulkCreateAction[BulkDagRunBody]' +} as const; + export const $BulkCreateAction_BulkTaskInstanceBody_ = { properties: { action: { @@ -832,6 +937,87 @@ export const $BulkCreateAction_VariableBody_ = { title: 'BulkCreateAction[VariableBody]' } as const; +export const $BulkDagRunBody = { + properties: { + state: { + anyOf: [ + { + '$ref': '#/components/schemas/DAGRunPatchStates' + }, + { + type: 'null' + } + ] + }, + note: { + anyOf: [ + { + type: 'string', + maxLength: 1000 + }, + { + type: 'null' + } + ], + title: 'Note' + }, + dag_run_id: { + type: 'string', + title: 'Dag Run Id' + }, + dag_id: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Dag Id' + } + }, + additionalProperties: false, + type: 'object', + required: ['dag_run_id'], + title: 'BulkDagRunBody', + description: 'Request body for bulk update and delete Dag runs.' +} as const; + +export const $BulkDeleteAction_BulkDagRunBody_ = { + properties: { + action: { + type: 'string', + const: 'delete', + title: 'Action', + description: 'The action to be performed on the entities.' + }, + entities: { + items: { + anyOf: [ + { + type: 'string' + }, + { + '$ref': '#/components/schemas/BulkDagRunBody' + } + ] + }, + type: 'array', + title: 'Entities', + description: 'A list of entity id/key or entity objects to be deleted.' + }, + action_on_non_existence: { + '$ref': '#/components/schemas/BulkActionNotOnExistence', + default: 'fail' + } + }, + additionalProperties: false, + type: 'object', + required: ['action', 'entities'], + title: 'BulkDeleteAction[BulkDagRunBody]' +} as const; + export const $BulkDeleteAction_BulkTaskInstanceBody_ = { properties: { action: { @@ -1102,6 +1288,48 @@ export const $BulkTaskInstanceBody = { description: 'Request body for bulk update, and delete task instances.' } as const; +export const $BulkUpdateAction_BulkDagRunBody_ = { + properties: { + action: { + type: 'string', + const: 'update', + title: 'Action', + description: 'The action to be performed on the entities.' + }, + entities: { + items: { + '$ref': '#/components/schemas/BulkDagRunBody' + }, + type: 'array', + title: 'Entities', + description: 'A list of entities to be updated.' + }, + update_mask: { + anyOf: [ + { + items: { + type: 'string' + }, + type: 'array' + }, + { + type: 'null' + } + ], + title: 'Update Mask', + description: 'A list of field names to update for each entity.Only these fields will be applied from the request body to the database model.Any extra fields provided will be ignored.' + }, + action_on_non_existence: { + '$ref': '#/components/schemas/BulkActionNotOnExistence', + default: 'fail' + } + }, + additionalProperties: false, + type: 'object', + required: ['action', 'entities'], + title: 'BulkUpdateAction[BulkDagRunBody]' +} as const; + export const $BulkUpdateAction_BulkTaskInstanceBody_ = { properties: { action: { @@ -3401,6 +3629,31 @@ export const $DagRunAssetReference = { description: 'DagRun serializer for asset responses.' } as const; +export const $DagRunIdentifier = { + properties: { + dag_run_id: { + type: 'string', + title: 'Dag Run Id' + }, + dag_id: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Dag Id' + } + }, + additionalProperties: false, + type: 'object', + required: ['dag_run_id'], + title: 'DagRunIdentifier', + description: 'Identifier for a Dag run targeted by a bulk operation.' +} as const; + export const $DagRunState = { type: 'string', enum: ['queued', 'running', 'success', 'failed'], diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 83c5d5c17b78d..1fd856a94b80d 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, PostClearDagRunsData, PostClearDagRunsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; export class AssetService { /** @@ -963,28 +963,33 @@ export class DagRunService { } /** - * Clear Dag Run + * Bulk Dag Runs + * Bulk update and delete Dag runs. + * + * A single request handles many Dag runs in one transaction. Per-entity + * failures are reported via ``BulkResponse`` so that a partial failure does + * not abort the whole batch. + * + * The path's ``dag_id`` may be ``~`` for cross-DAG operations; in that case + * each entity must specify its own ``dag_id`` in the body. * @param data The data for the request. * @param data.dagId - * @param data.dagRunId * @param data.requestBody - * @returns unknown Successful Response + * @returns BulkResponse Successful Response * @throws ApiError */ - public static clearDagRun(data: ClearDagRunData): CancelablePromise { + public static bulkDagRuns(data: BulkDagRunsData): CancelablePromise { return __request(OpenAPI, { - method: 'POST', - url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear', + method: 'PATCH', + url: '/api/v2/dags/{dag_id}/dagRuns', path: { - dag_id: data.dagId, - dag_run_id: data.dagRunId + dag_id: data.dagId }, body: data.requestBody, mediaType: 'application/json', errors: { 401: 'Unauthorized', 403: 'Forbidden', - 404: 'Not Found', 422: 'Validation Error' } }); @@ -1148,6 +1153,68 @@ export class DagRunService { }); } + /** + * Post Clear Dag Runs + * Clear multiple Dag runs in a single request. + * + * Mirrors the per-DAG bulk pattern of ``POST /dags/{dag_id}/clearTaskInstances``: + * each ``(dag_id, dag_run_id)`` in ``runs`` is processed in the same transaction + * and per-entry failures are reported via ``BulkActionResponse.errors``. + * + * The path's ``dag_id`` may be ``~`` for cross-DAG clears; otherwise each entry + * must reference the same ``dag_id`` as the path. + * @param data The data for the request. + * @param data.dagId + * @param data.requestBody + * @returns BulkActionResponse Successful Response + * @throws ApiError + */ + public static postClearDagRuns(data: PostClearDagRunsData): CancelablePromise { + return __request(OpenAPI, { + method: 'POST', + url: '/api/v2/dags/{dag_id}/dagRuns/clear', + path: { + dag_id: data.dagId + }, + body: data.requestBody, + mediaType: 'application/json', + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Clear Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.requestBody + * @returns unknown Successful Response + * @throws ApiError + */ + public static clearDagRun(data: ClearDagRunData): CancelablePromise { + return __request(OpenAPI, { + method: 'POST', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId + }, + body: data.requestBody, + mediaType: 'application/json', + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the Dag run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 5b216da03a58b..80df410dbed96 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -186,6 +186,10 @@ export type BulkActionResponse = { }>; }; +export type BulkBody_BulkDagRunBody_ = { + actions: Array<(BulkCreateAction_BulkDagRunBody_ | BulkUpdateAction_BulkDagRunBody_ | BulkDeleteAction_BulkDagRunBody_)>; +}; + export type BulkBody_BulkTaskInstanceBody_ = { actions: Array<(BulkCreateAction_BulkTaskInstanceBody_ | BulkUpdateAction_BulkTaskInstanceBody_ | BulkDeleteAction_BulkTaskInstanceBody_)>; }; @@ -202,6 +206,39 @@ export type BulkBody_VariableBody_ = { actions: Array<(BulkCreateAction_VariableBody_ | BulkUpdateAction_VariableBody_ | BulkDeleteAction_VariableBody_)>; }; +/** + * Request body for the bulk clear Dag runs endpoint. + */ +export type BulkClearDagRunsBody = { + runs: Array; + only_failed?: boolean; + /** + * Only queue newly added tasks in the latest Dag version without clearing existing tasks. + */ + only_new?: boolean; + /** + * (Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run. + */ + run_on_latest_version?: boolean; + dry_run?: boolean; + /** + * Optional note applied to every Dag Run that is successfully cleared. Ignored on dry runs. + */ + note?: string | null; +}; + +export type BulkCreateAction_BulkDagRunBody_ = { + /** + * The action to be performed on the entities. + */ + action: "create"; + /** + * A list of entities to be created. + */ + entities: Array; + action_on_existence?: BulkActionOnExistence; +}; + export type BulkCreateAction_BulkTaskInstanceBody_ = { /** * The action to be performed on the entities. @@ -250,6 +287,28 @@ export type BulkCreateAction_VariableBody_ = { action_on_existence?: BulkActionOnExistence; }; +/** + * Request body for bulk update and delete Dag runs. + */ +export type BulkDagRunBody = { + state?: DAGRunPatchStates | null; + note?: string | null; + dag_run_id: string; + dag_id?: string | null; +}; + +export type BulkDeleteAction_BulkDagRunBody_ = { + /** + * The action to be performed on the entities. + */ + action: "delete"; + /** + * A list of entity id/key or entity objects to be deleted. + */ + entities: Array<(string | BulkDagRunBody)>; + action_on_non_existence?: BulkActionNotOnExistence; +}; + export type BulkDeleteAction_BulkTaskInstanceBody_ = { /** * The action to be performed on the entities. @@ -336,6 +395,22 @@ export type BulkTaskInstanceBody = { dag_run_id?: string | null; }; +export type BulkUpdateAction_BulkDagRunBody_ = { + /** + * The action to be performed on the entities. + */ + action: "update"; + /** + * A list of entities to be updated. + */ + entities: Array; + /** + * A list of field names to update for each entity.Only these fields will be applied from the request body to the database model.Any extra fields provided will be ignored. + */ + update_mask?: Array<(string)> | null; + action_on_non_existence?: BulkActionNotOnExistence; +}; + export type BulkUpdateAction_BulkTaskInstanceBody_ = { /** * The action to be performed on the entities. @@ -841,6 +916,14 @@ export type DagRunAssetReference = { partition_key: string | null; }; +/** + * Identifier for a Dag run targeted by a bulk operation. + */ +export type DagRunIdentifier = { + dag_run_id: string; + dag_id?: string | null; +}; + /** * All possible states that a DagRun can be in. * @@ -2680,13 +2763,12 @@ export type GetUpstreamAssetEventsData = { export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse; -export type ClearDagRunData = { +export type BulkDagRunsData = { dagId: string; - dagRunId: string; - requestBody: DAGRunClearBody; + requestBody: BulkBody_BulkDagRunBody_; }; -export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse | DAGRunResponse; +export type BulkDagRunsResponse = BulkResponse; export type GetDagRunsData = { bundleVersion?: string | null; @@ -2784,6 +2866,21 @@ export type TriggerDagRunData = { export type TriggerDagRunResponse = DAGRunResponse; +export type PostClearDagRunsData = { + dagId: string; + requestBody: BulkClearDagRunsBody; +}; + +export type PostClearDagRunsResponse = BulkActionResponse; + +export type ClearDagRunData = { + dagId: string; + dagRunId: string; + requestBody: DAGRunClearBody; +}; + +export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse | DAGRunResponse; + export type WaitDagRunUntilFinishedData = { dagId: string; dagRunId: string; @@ -5040,14 +5137,14 @@ export type $OpenApiTs = { }; }; }; - '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': { - post: { - req: ClearDagRunData; + '/api/v2/dags/{dag_id}/dagRuns': { + patch: { + req: BulkDagRunsData; res: { /** * Successful Response */ - 200: ClearTaskInstanceCollectionResponse | DAGRunResponse; + 200: BulkResponse; /** * Unauthorized */ @@ -5056,18 +5153,12 @@ export type $OpenApiTs = { * Forbidden */ 403: HTTPExceptionResponse; - /** - * Not Found - */ - 404: HTTPExceptionResponse; /** * Validation Error */ 422: HTTPValidationError; }; }; - }; - '/api/v2/dags/{dag_id}/dagRuns': { get: { req: GetDagRunsData; res: { @@ -5127,6 +5218,60 @@ export type $OpenApiTs = { }; }; }; + '/api/v2/dags/{dag_id}/dagRuns/clear': { + post: { + req: PostClearDagRunsData; + res: { + /** + * Successful Response + */ + 200: BulkActionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': { + post: { + req: ClearDagRunData; + res: { + /** + * Successful Response + */ + 200: ClearTaskInstanceCollectionResponse | DAGRunResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait': { get: { req: WaitDagRunUntilFinishedData; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx index 2b53c06e5fa8f..763479417e44d 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx @@ -44,14 +44,14 @@ const BulkClearDagRunsButton = ({ clearSelections, selectedDagRuns }: Props) => onSuccessConfirm: onClose, }); - const onlyFailed = selectedOptions.includes("onlyFailed"); - const onlyNew = selectedOptions.includes("newTasks"); - const handleClose = () => { setNote(null); onClose(); }; + const onlyFailed = selectedOptions.includes("onlyFailed"); + const onlyNew = selectedOptions.includes("newTasks"); + return ( <> - + { + if (!details.open) { + handleClose(); + } + }} + open={open} + size="xl" + > @@ -75,7 +90,7 @@ const BulkClearDagRunsButton = ({ clearSelections, selectedDagRuns }: Props) => + disabled={selectedDagRuns.length === 0} loading={isPending} onClick={() => { - void bulkClear(selectedDagRuns, { + clear(selectedDagRuns, { note, onlyFailed, onlyNew, diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx index 360fb9f0b6e5b..e3b52b38786a8 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx @@ -23,7 +23,9 @@ import { FiTrash2 } from "react-icons/fi"; import type { DAGRunResponse } from "openapi/requests/types.gen"; import { ErrorAlert } from "src/components/ErrorAlert"; import { Dialog } from "src/components/ui"; -import { useBulkDagRuns } from "src/queries/useBulkDagRuns"; +import { useBulkDeleteDagRuns } from "src/queries/useBulkDeleteDagRuns"; + +import { BulkErrorList } from "./BulkErrorList"; type Props = { readonly clearSelections: VoidFunction; @@ -33,11 +35,16 @@ type Props = { const BulkDeleteDagRunsButton = ({ clearSelections, selectedDagRuns }: Props) => { const { t: translate } = useTranslation(); const { onClose, onOpen, open } = useDisclosure(); - const { bulkDelete, error, isPending } = useBulkDagRuns({ + const { actionErrors, error, isPending, remove, reset } = useBulkDeleteDagRuns({ clearSelections, onSuccessConfirm: onClose, }); + const handleClose = () => { + reset(); + onClose(); + }; + return ( <> - + { + if (!details.open) { + handleClose(); + } + }} + open={open} + size="xl" + > @@ -79,12 +94,13 @@ const BulkDeleteDagRunsButton = ({ clearSelections, selectedDagRuns }: Props) => +