Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class ClearTaskInstancesBody(StrictBaseModel):
"clearing the task instances.",
)
prevent_running_task: bool = False
note: Annotated[str, StringConstraints(max_length=1000)] | None = None

@model_validator(mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9904,6 +9904,12 @@ components:
type: boolean
title: Prevent Running Task
default: false
note:
anyOf:
- type: string
maxLength: 1000
- type: 'null'
title: Note
additionalProperties: false
type: object
title: ClearTaskInstancesBody
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ def post_clear_task_instances(
dag_bag: DagBagDep,
body: ClearTaskInstancesBody,
session: SessionDep,
user: GetUserDep,
) -> TaskInstanceCollectionResponse:
"""Clear task instances."""
dag = get_latest_version_of_dag(dag_bag, dag_id, session)
Expand Down Expand Up @@ -837,6 +838,13 @@ def _collect_relatives(run_id: str, direction: Literal["upstream", "downstream"]
except AirflowClearRunningTaskException as e:
raise HTTPException(status.HTTP_409_CONFLICT, str(e)) from e

if body.note is not None:
_patch_task_instance_note(
task_instance_body=body,
tis=task_instances,
user=user,
)

return TaskInstanceCollectionResponse(
task_instances=[TaskInstanceResponse.model_validate(ti) for ti in task_instances],
total_entries=len(task_instances),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
BulkDeleteAction,
BulkUpdateAction,
)
from airflow.api_fastapi.core_api.datamodels.task_instances import BulkTaskInstanceBody, PatchTaskInstanceBody
from airflow.api_fastapi.core_api.datamodels.task_instances import (
BulkTaskInstanceBody,
ClearTaskInstancesBody,
PatchTaskInstanceBody,
)
from airflow.api_fastapi.core_api.security import GetUserDep
from airflow.api_fastapi.core_api.services.public.common import BulkService
from airflow.listeners.listener import get_listener_manager
Expand Down Expand Up @@ -139,7 +143,7 @@ def _patch_task_instance_state(


def _patch_task_instance_note(
task_instance_body: BulkTaskInstanceBody | PatchTaskInstanceBody,
task_instance_body: BulkTaskInstanceBody | ClearTaskInstancesBody | PatchTaskInstanceBody,
tis: list[TI],
user: GetUserDep,
update_mask: list[str] | None = Query(None),
Expand Down Expand Up @@ -275,6 +279,7 @@ def _perform_update(
dag_bag=self.dag_bag,
body=entity,
session=self.session,
map_index=map_index,
update_mask=update_mask,
)

Expand Down Expand Up @@ -318,12 +323,12 @@ def handle_bulk_update(

try:
specific_entity_map = {
(entity.dag_id, entity.dag_run_id, entity.task_id, entity.map_index): entity
self._extract_task_identifiers(entity): entity
for entity in action.entities
if entity.map_index is not None
}
all_map_entity_map = {
(entity.dag_id, entity.dag_run_id, entity.task_id): entity
self._extract_task_identifiers(entity)[:3]: entity
for entity in action.entities
if entity.map_index is None
}
Expand Down
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,18 @@ export const $ClearTaskInstancesBody = {
type: 'boolean',
title: 'Prevent Running Task',
default: false
},
note: {
anyOf: [
{
type: 'string',
maxLength: 1000
},
{
type: 'null'
}
],
title: 'Note'
}
},
additionalProperties: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ export type ClearTaskInstancesBody = {
*/
run_on_latest_version?: boolean;
prevent_running_task?: boolean;
note?: string | null;
};

/**
Expand Down
15 changes: 15 additions & 0 deletions airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@
"users": "Users"
},
"selectLanguage": "Select Language",
"selected": "Selected",
"showDetailsPanel": "Show Details Panel",
"signedInAs": "Signed in as",
"source": {
Expand Down Expand Up @@ -293,13 +294,27 @@
"utc": "UTC (Coordinated Universal Time)"
},
"toaster": {
"bulkClear": {
"error": "Bulk Clear {{resourceName}} Request Failed",
"success": {
"description": "{{count}} {{resourceName}} have been successfully cleared. Keys: {{keys}}",
"title": "Bulk Clear {{resourceName}} Request Submitted"
}
},
"bulkDelete": {
"error": "Bulk Delete {{resourceName}} Request Failed",
"success": {
"description": "{{count}} {{resourceName}} have been successfully deleted. Keys: {{keys}}",
"title": "Bulk Delete {{resourceName}} Request Submitted"
}
},
"bulkUpdate": {
"error": "Bulk Update {{resourceName}} Request Failed",
"success": {
"description": "{{count}} {{resourceName}} have been successfully updated. Keys: {{keys}}",
"title": "Bulk Update {{resourceName}} Request Submitted"
}
},
"create": {
"error": "Create {{resourceName}} Request Failed",
"success": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import { Box, Editable, Text, VStack } from "@chakra-ui/react";
import type { ChangeEvent } from "react";
import { useTranslation } from "react-i18next";

import type { DAGRunResponse, TaskInstanceCollectionResponse } from "openapi/requests/types.gen";
import type {
DAGRunResponse,
TaskInstanceCollectionResponse,
TaskInstanceResponse,
} from "openapi/requests/types.gen";
import ReactMarkdown from "src/components/ReactMarkdown";
import { Accordion } from "src/components/ui";

Expand All @@ -29,17 +33,59 @@ import { getColumns } from "./columns";

type Props = {
readonly affectedTasks?: TaskInstanceCollectionResponse;
readonly groupByRunId?: boolean;
readonly note: DAGRunResponse["note"];
readonly setNote: (value: string) => void;
};

const TasksTable = ({
noRowsMessage,
tasks,
}: {
readonly noRowsMessage: string;
readonly tasks: Array<TaskInstanceResponse>;
}) => {
const { t: translate } = useTranslation();
const columns = getColumns(translate);

return (
<DataTable
columns={columns}
data={tasks}
displayMode="table"
modelName="common:taskInstance"
noRowsMessage={noRowsMessage}
total={tasks.length}
/>
);
};

// Table is in memory, pagination and sorting are disabled.
// TODO: Make a front-end only unconnected table component with client side ordering and pagination
const ActionAccordion = ({ affectedTasks, note, setNote }: Props) => {
const ActionAccordion = ({ affectedTasks, groupByRunId = false, note, setNote }: Props) => {
const showTaskSection = affectedTasks !== undefined;
const { t: translate } = useTranslation();

const columns = getColumns(translate);
// Group task instances by dag_run_id when requested
const runGroups = (() => {
if (!groupByRunId || !affectedTasks) {
return undefined;
}

const map = new Map<string, Array<TaskInstanceResponse>>();

for (const ti of affectedTasks.task_instances) {
const group = map.get(ti.dag_run_id) ?? [];

group.push(ti);
map.set(ti.dag_run_id, group);
}

return map;
})();

// Only group when there are actually multiple run IDs
const shouldGroup = groupByRunId && runGroups !== undefined && runGroups.size > 1;

return (
<Accordion.Root
Expand All @@ -59,14 +105,33 @@ const ActionAccordion = ({ affectedTasks, note, setNote }: Props) => {
</Accordion.ItemTrigger>
<Accordion.ItemContent>
<Box maxH="400px" overflowY="scroll">
<DataTable
columns={columns}
data={affectedTasks.task_instances}
displayMode="table"
modelName="common:taskInstance"
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
total={affectedTasks.total_entries}
/>
{shouldGroup ? (
<Accordion.Root collapsible multiple variant="plain">
{[...runGroups.entries()].map(([runId, tis]) => (
<Accordion.Item key={runId} value={runId}>
<Accordion.ItemTrigger px={2} py={1}>
<Text fontSize="sm" fontWeight="semibold">
{translate("runId")}: {runId}{" "}
<Text as="span" color="fg.subtle" fontWeight="normal">
({tis.length})
</Text>
</Text>
</Accordion.ItemTrigger>
<Accordion.ItemContent>
<TasksTable
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
tasks={tis}
/>
</Accordion.ItemContent>
</Accordion.Item>
))}
</Accordion.Root>
) : (
<TasksTable
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
tasks={affectedTasks.task_instances}
/>
)}
</Box>
</Accordion.ItemContent>
</Accordion.Item>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr
const downstream = selectedOptions.includes("downstream");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);

const [note, setNote] = useState<string>("");
const [note, setNote] = useState<string | null>(null);

const { data: dagDetails } = useDagServiceGetDagDetails({
dagId,
Expand Down Expand Up @@ -186,6 +186,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr
include_future: future,
include_past: past,
include_upstream: upstream,
...(note === null ? {} : { note }),
only_failed: onlyFailed,
run_on_latest_version: runOnLatestVersion,
task_ids: groupTaskIds,
Expand Down
Loading
Loading