Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ class DAGRunClearBody(StrictBaseModel):
"then the ``[core] rerun_with_latest_version`` config option, "
"and finally ``False`` (the historical default for clear/rerun).",
)
note: str | None = Field(
default=None,
max_length=1000,
)

@model_validator(mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13021,6 +13021,12 @@ components:
after clearing the Dag Run. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version``
config option, and finally ``False`` (the historical default for clear/rerun).
note:
anyOf:
- type: string
maxLength: 1000
- type: 'null'
title: Note
additionalProperties: false
type: object
title: DAGRunClearBody
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def clear_dag_run(
body: DAGRunClearBody,
dag_bag: DagBagDep,
session: SessionDep,
user: GetUserDep,
) -> ClearTaskInstanceCollectionResponse | DAGRunResponse:
dag_run = session.scalar(
select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id).options(joinedload(DagRun.dag_model))
Expand Down Expand Up @@ -388,6 +389,12 @@ def clear_dag_run(
dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id))
if not dag_run_cleared:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Dag run not found after clearing")
if body.note is not None:
if dag_run_cleared.dag_run_note is None:
dag_run_cleared.note = (body.note, user.get_id())
else:
dag_run_cleared.dag_run_note.content = body.note
dag_run_cleared.dag_run_note.user_id = user.get_id()
return dag_run_cleared


Expand Down
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2778,6 +2778,18 @@ export const $DAGRunClearBody = {
],
title: 'Run On Latest Version',
description: '(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False`` (the historical default for clear/rerun).'
},
note: {
anyOf: [
{
type: 'string',
maxLength: 1000
},
{
type: 'null'
}
],
title: 'Note'
}
},
additionalProperties: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ export type DAGRunClearBody = {
* (Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False`` (the historical default for clear/rerun).
*/
run_on_latest_version?: boolean | null;
note?: string | null;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import { Checkbox, Dialog } from "src/components/ui";
import SegmentedControl from "src/components/ui/SegmentedControl";
import { useClearDagRunDryRun } from "src/queries/useClearDagRunDryRun";
import { useClearDagRun } from "src/queries/useClearRun";
import { usePatchDagRun } from "src/queries/usePatchDagRun";
import { isStatePending, useAutoRefresh } from "src/utils";

type Props = {
Expand Down Expand Up @@ -80,12 +79,6 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => {
onSuccessConfirm: onClose,
});

const { isPending: isPendingPatchDagRun, mutate: mutatePatchDagRun } = usePatchDagRun({
dagId,
dagRunId,
onSuccess: onClose,
});

// Check if DAG versions differ (works for both bundle-versioned and local bundles)
const latestDagVersionNumber = dagDetails?.latest_dag_version?.version_number;
const dagRunVersionNumber = dagRun.dag_versions.at(-1)?.version_number;
Expand Down Expand Up @@ -148,25 +141,19 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => {
) : undefined}
<Button
disabled={affectedTasks.total_entries === 0}
loading={isPending || isPendingPatchDagRun}
loading={isPending}
onClick={() => {
mutate({
dagId,
dagRunId,
requestBody: {
dry_run: false,
note: note === dagRun.note ? undefined : note,
only_failed: onlyFailed,
only_new: onlyNew,
run_on_latest_version: runOnLatestVersion,
},
});
if (note !== dagRun.note) {
mutatePatchDagRun({
dagId,
dagRunId,
requestBody: { note },
});
}
}}
>
<CgRedo /> {translate("modal.confirm")}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import { Checkbox, Dialog } from "src/components/ui";
import SegmentedControl from "src/components/ui/SegmentedControl";
import { useClearTaskInstances } from "src/queries/useClearTaskInstances";
import { useClearTaskInstancesDryRun } from "src/queries/useClearTaskInstancesDryRun";
import { usePatchTaskInstance } from "src/queries/usePatchTaskInstance";
import { isStatePending, useAutoRefresh } from "src/utils";

import ClearTaskInstanceConfirmationDialog from "./ClearTaskInstanceConfirmationDialog";
Expand Down Expand Up @@ -93,11 +92,6 @@ const ClearTaskInstanceDialog = (props: Props) => {
const [preventRunningTask, setPreventRunningTask] = useState(true);

const [note, setNote] = useState<string | null>(taskInstance?.note ?? null);
const { isPending: isPendingPatchDagRun, mutate: mutatePatchTaskInstance } = usePatchTaskInstance({
dagId,
dagRunId,
taskId,
});

// Get current DAG's bundle version to compare with task instance's DAG version bundle version
const { data: dagDetails } = useDagServiceGetDagDetails({
Expand Down Expand Up @@ -229,11 +223,7 @@ const ClearTaskInstanceDialog = (props: Props) => {
>
{translate("dags:runAndTaskActions.options.preventRunningTasks")}
</Checkbox>
<Button
disabled={affectedTasks.total_entries === 0}
loading={isPending || isPendingPatchDagRun}
onClick={onOpen}
>
<Button disabled={affectedTasks.total_entries === 0} loading={isPending} onClick={onOpen}>
<CgRedo /> {translate("modal.confirm")}
</Button>
</Flex>
Expand All @@ -255,6 +245,8 @@ const ClearTaskInstanceDialog = (props: Props) => {
}}
onClose={onClose}
onConfirm={() => {
const noteChanged = note !== (taskInstance?.note ?? null);

mutate({
dagId,
requestBody: {
Expand All @@ -264,21 +256,13 @@ const ClearTaskInstanceDialog = (props: Props) => {
include_future: future,
include_past: past,
include_upstream: upstream,
note: noteChanged ? note : undefined,
Comment thread
pierrejeambrun marked this conversation as resolved.
only_failed: onlyFailed,
run_on_latest_version: runOnLatestVersion,
task_ids: allMapped ? [taskId] : [[taskId, mapIndex as number]],
...(preventRunningTask ? { prevent_running_task: true } : {}),
},
});
if (note !== (taskInstance?.note ?? null)) {
mutatePatchTaskInstance({
dagId,
dagRunId,
...(allMapped ? {} : { mapIndex }),
requestBody: { note },
taskId,
});
}
onCloseDialog();
}}
open={open}
Expand Down
21 changes: 1 addition & 20 deletions airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ export const useBulkClearDagRuns = ({ deselectKeys, onSuccessConfirm }: Props) =
dagRunId: dagRun.dag_run_id,
requestBody: {
dry_run: false,
note: options.note ?? undefined,
Comment thread
pierrejeambrun marked this conversation as resolved.
only_failed: options.onlyFailed,
only_new: options.onlyNew,
},
Expand All @@ -130,26 +131,6 @@ export const useBulkClearDagRuns = ({ deselectKeys, onSuccessConfirm }: Props) =
}
});

if (succeeded.length > 0 && options.note !== null) {
const noteSettled = await Promise.allSettled(
succeeded
.filter((dagRun) => dagRun.note !== options.note)
.map((dagRun) =>
DagRunService.patchDagRun({
dagId: dagRun.dag_id,
dagRunId: dagRun.dag_run_id,
requestBody: { note: options.note },
}).then(() => dagRun),
),
);

noteSettled.forEach((outcome) => {
if (outcome.status === "rejected") {
errors.push({ error: `note: ${formatError(outcome.reason)}` });
}
});
}

await invalidateQueries(dagRuns);

if (succeeded.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,40 @@ def test_should_respond_403(self, unauthorized_test_client):
)
assert response.status_code == 403

@pytest.mark.parametrize(
("body", "expected_note"),
[
({"dry_run": False, "note": "cleared by test"}, "cleared by test"),
({"dry_run": False, "note": ""}, ""),
({"dry_run": False, "note": None}, "test_note"),
({"dry_run": False}, "test_note"),
],
ids=["set-new-note", "set-empty-note", "explicit-null-leaves-existing", "omit-leaves-existing"],
)
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_clear_dag_run_applies_note(self, test_client, session, body, expected_note):
"""``note`` in the clear body writes to the Dag Run; ``None`` / unset leaves it alone."""
response = test_client.post(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json=body)
assert response.status_code == 200
assert response.json()["note"] == expected_note
dag_run = session.scalar(
select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == DAG1_RUN1_ID)
)
assert dag_run.note == expected_note

@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_clear_dag_run_dry_run_does_not_apply_note(self, test_client, session):
"""``note`` is ignored on dry-run (no side effects)."""
response = test_client.post(
f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
json={"dry_run": True, "note": "ignored"},
)
assert response.status_code == 200
dag_run = session.scalar(
select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == DAG1_RUN1_ID)
)
assert dag_run.note == "test_note"

@pytest.mark.parametrize(
("body", "dag_run_id", "expected_state"),
[
Expand Down
1 change: 1 addition & 0 deletions airflow-ctl/src/airflowctl/api/datamodels/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ class DAGRunClearBody(BaseModel):
title="Run On Latest Version",
),
] = None
note: Annotated[Note | None, Field(title="Note")] = None


class DAGSourceResponse(BaseModel):
Expand Down
Loading