diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 97b37fdaae5ea..8373847328d34 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -73,6 +73,10 @@ class DAGRunClearBody(StrictBaseModel): "then the ``[core] rerun_with_latest_version`` config option, " "and finally ``False`` (the historical default for clear/rerun).", ) + note: str | None = Field( + default=None, + max_length=1000, + ) @model_validator(mode="before") @classmethod diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 618356a1ce793..009e9c6f2f7e2 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -13021,6 +13021,12 @@ components: after clearing the Dag Run. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False`` (the historical default for clear/rerun). + note: + anyOf: + - type: string + maxLength: 1000 + - type: 'null' + title: Note additionalProperties: false type: object title: DAGRunClearBody diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index a5c76550be559..aa82b14fefd58 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -319,6 +319,7 @@ def clear_dag_run( body: DAGRunClearBody, dag_bag: DagBagDep, session: SessionDep, + user: GetUserDep, ) -> ClearTaskInstanceCollectionResponse | DAGRunResponse: dag_run = session.scalar( select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id).options(joinedload(DagRun.dag_model)) @@ -388,6 +389,12 @@ def clear_dag_run( dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id)) if not dag_run_cleared: raise HTTPException(status.HTTP_404_NOT_FOUND, "Dag run not found after clearing") + if body.note is not None: + if dag_run_cleared.dag_run_note is None: + dag_run_cleared.note = (body.note, user.get_id()) + else: + dag_run_cleared.dag_run_note.content = body.note + dag_run_cleared.dag_run_note.user_id = user.get_id() return dag_run_cleared diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 4a4b95f183023..27ce470d7ced1 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2778,6 +2778,18 @@ export const $DAGRunClearBody = { ], title: 'Run On Latest Version', description: '(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False`` (the historical default for clear/rerun).' + }, + note: { + anyOf: [ + { + type: 'string', + maxLength: 1000 + }, + { + type: 'null' + } + ], + title: 'Note' } }, additionalProperties: false, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 77380eec73833..3f5ea4f962369 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -751,6 +751,7 @@ export type DAGRunClearBody = { * (Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run. If not specified, falls back to the DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] rerun_with_latest_version`` config option, and finally ``False`` (the historical default for clear/rerun). */ run_on_latest_version?: boolean | null; + note?: string | null; }; /** diff --git a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx index 05f15856c6bfe..62710def9d4c0 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx @@ -29,7 +29,6 @@ import { Checkbox, Dialog } from "src/components/ui"; import SegmentedControl from "src/components/ui/SegmentedControl"; import { useClearDagRunDryRun } from "src/queries/useClearDagRunDryRun"; import { useClearDagRun } from "src/queries/useClearRun"; -import { usePatchDagRun } from "src/queries/usePatchDagRun"; import { isStatePending, useAutoRefresh } from "src/utils"; type Props = { @@ -80,12 +79,6 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => { onSuccessConfirm: onClose, }); - const { isPending: isPendingPatchDagRun, mutate: mutatePatchDagRun } = usePatchDagRun({ - dagId, - dagRunId, - onSuccess: onClose, - }); - // Check if DAG versions differ (works for both bundle-versioned and local bundles) const latestDagVersionNumber = dagDetails?.latest_dag_version?.version_number; const dagRunVersionNumber = dagRun.dag_versions.at(-1)?.version_number; @@ -148,25 +141,19 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => { ) : undefined} @@ -255,6 +245,8 @@ const ClearTaskInstanceDialog = (props: Props) => { }} onClose={onClose} onConfirm={() => { + const noteChanged = note !== (taskInstance?.note ?? null); + mutate({ dagId, requestBody: { @@ -264,21 +256,13 @@ const ClearTaskInstanceDialog = (props: Props) => { include_future: future, include_past: past, include_upstream: upstream, + note: noteChanged ? note : undefined, only_failed: onlyFailed, run_on_latest_version: runOnLatestVersion, task_ids: allMapped ? [taskId] : [[taskId, mapIndex as number]], ...(preventRunningTask ? { prevent_running_task: true } : {}), }, }); - if (note !== (taskInstance?.note ?? null)) { - mutatePatchTaskInstance({ - dagId, - dagRunId, - ...(allMapped ? {} : { mapIndex }), - requestBody: { note }, - taskId, - }); - } onCloseDialog(); }} open={open} diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts index 33288985fd9f1..b3caa1a77ac90 100644 --- a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts +++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts @@ -106,6 +106,7 @@ export const useBulkClearDagRuns = ({ deselectKeys, onSuccessConfirm }: Props) = dagRunId: dagRun.dag_run_id, requestBody: { dry_run: false, + note: options.note ?? undefined, only_failed: options.onlyFailed, only_new: options.onlyNew, }, @@ -130,26 +131,6 @@ export const useBulkClearDagRuns = ({ deselectKeys, onSuccessConfirm }: Props) = } }); - if (succeeded.length > 0 && options.note !== null) { - const noteSettled = await Promise.allSettled( - succeeded - .filter((dagRun) => dagRun.note !== options.note) - .map((dagRun) => - DagRunService.patchDagRun({ - dagId: dagRun.dag_id, - dagRunId: dagRun.dag_run_id, - requestBody: { note: options.note }, - }).then(() => dagRun), - ), - ); - - noteSettled.forEach((outcome) => { - if (outcome.status === "rejected") { - errors.push({ error: `note: ${formatError(outcome.reason)}` }); - } - }); - } - await invalidateQueries(dagRuns); if (succeeded.length > 0) { diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index bc59d204378fa..4ffcc12bc76af 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1693,6 +1693,40 @@ def test_should_respond_403(self, unauthorized_test_client): ) assert response.status_code == 403 + @pytest.mark.parametrize( + ("body", "expected_note"), + [ + ({"dry_run": False, "note": "cleared by test"}, "cleared by test"), + ({"dry_run": False, "note": ""}, ""), + ({"dry_run": False, "note": None}, "test_note"), + ({"dry_run": False}, "test_note"), + ], + ids=["set-new-note", "set-empty-note", "explicit-null-leaves-existing", "omit-leaves-existing"], + ) + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_clear_dag_run_applies_note(self, test_client, session, body, expected_note): + """``note`` in the clear body writes to the Dag Run; ``None`` / unset leaves it alone.""" + response = test_client.post(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json=body) + assert response.status_code == 200 + assert response.json()["note"] == expected_note + dag_run = session.scalar( + select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == DAG1_RUN1_ID) + ) + assert dag_run.note == expected_note + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_clear_dag_run_dry_run_does_not_apply_note(self, test_client, session): + """``note`` is ignored on dry-run (no side effects).""" + response = test_client.post( + f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", + json={"dry_run": True, "note": "ignored"}, + ) + assert response.status_code == 200 + dag_run = session.scalar( + select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == DAG1_RUN1_ID) + ) + assert dag_run.note == "test_note" + @pytest.mark.parametrize( ("body", "dag_run_id", "expected_state"), [ diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index f05fa65cf56f8..666b2bb19c70f 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -369,6 +369,7 @@ class DAGRunClearBody(BaseModel): title="Run On Latest Version", ), ] = None + note: Annotated[Note | None, Field(title="Note")] = None class DAGSourceResponse(BaseModel):