From 9bd0bffc3b204b9d474eb8f6e5097bb910cf2026 Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Sun, 4 May 2025 15:57:15 +0800 Subject: [PATCH 1/2] Add TI deletion endpoint --- .../openapi/v1-rest-api-generated.yaml | 66 +++++++++ .../core_api/routes/public/task_instances.py | 34 +++++ .../airflow/ui/openapi-gen/queries/common.ts | 3 + .../airflow/ui/openapi-gen/queries/queries.ts | 51 +++++++ .../ui/openapi-gen/requests/services.gen.ts | 36 +++++ .../ui/openapi-gen/requests/types.gen.ts | 34 +++++ .../routes/public/test_task_instances.py | 137 ++++++++++++++++++ 7 files changed, 361 insertions(+) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml index 56acc10aba0d2..033d18fb34dcb 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml @@ -4573,6 +4573,72 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + delete: + tags: + - Task Instance + summary: Delete Task Instance + description: Delete a task instance. + operationId: delete_task_instance + security: + - OAuth2PasswordBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: map_index + in: query + required: false + schema: + type: integer + default: -1 + title: Map Index + responses: + '200': + description: Successful Response + content: + application/json: + schema: + type: 'null' + title: Response Delete Task Instance + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/listMapped: get: tags: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index f46facc3450e7..da436a7d71339 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -928,3 +928,37 @@ def patch_task_instance( session.commit() return TaskInstanceResponse.model_validate(ti) + + +@task_instances_router.delete( + task_instances_prefix + "/{task_id}", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_dag(method="DELETE", access_entity=DagAccessEntity.TASK_INSTANCE))], +) +def delete_task_instance( + dag_id: str, + dag_run_id: str, + task_id: str, + session: SessionDep, + map_index: int = -1, +) -> None: + """Delete a task instance.""" + query = select(TI).where( + TI.dag_id == dag_id, + TI.run_id == dag_run_id, + TI.task_id == task_id, + ) + + if map_index >= 0: + query = query.where(TI.map_index == map_index) + else: + query = query.where(TI.map_index == -1) + + task_instance = session.scalar(query) + if task_instance is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Task Instance id {task_id} not found in dag {dag_id} run {dag_run_id}", + ) + + session.delete(task_instance) diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 4e6a91683c1e5..c0e71a471280b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -1943,6 +1943,9 @@ export type ConnectionServiceDeleteConnectionMutationResult = Awaited< >; export type DagRunServiceDeleteDagRunMutationResult = Awaited>; export type DagServiceDeleteDagMutationResult = Awaited>; +export type TaskInstanceServiceDeleteTaskInstanceMutationResult = Awaited< + ReturnType +>; export type PoolServiceDeletePoolMutationResult = Awaited>; export type VariableServiceDeleteVariableMutationResult = Awaited< ReturnType diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 14cbb6361b0ad..5ef91fcd933f4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -4738,6 +4738,57 @@ export const useDagServiceDeleteDag = < }, TContext >({ mutationFn: ({ dagId }) => DagService.deleteDag({ dagId }) as unknown as Promise, ...options }); +/** + * Delete Task Instance + * Delete a task instance. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @returns null Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceDeleteTaskInstance = < + TData = Common.TaskInstanceServiceDeleteTaskInstanceMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + }, + TContext + >({ + mutationFn: ({ dagId, dagRunId, mapIndex, taskId }) => + TaskInstanceService.deleteTaskInstance({ + dagId, + dagRunId, + mapIndex, + taskId, + }) as unknown as Promise, + ...options, + }); /** * Delete Pool * Delete a pool entry. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 68d167071e9e4..479c8b6c36d3d 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -116,6 +116,8 @@ import type { GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, + DeleteTaskInstanceData, + DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, @@ -1950,6 +1952,40 @@ export class TaskInstanceService { }); } + /** + * Delete Task Instance + * Delete a task instance. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @returns null Successful Response + * @throws ApiError + */ + public static deleteTaskInstance( + data: DeleteTaskInstanceData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "DELETE", + url: "/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + }, + query: { + map_index: data.mapIndex, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + /** * Get Mapped Task Instances * Get list of mapped task instances. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 16a5ef98373c8..86e71994465dd 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2237,6 +2237,15 @@ export type PatchTaskInstanceData = { export type PatchTaskInstanceResponse = TaskInstanceResponse; +export type DeleteTaskInstanceData = { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; +}; + +export type DeleteTaskInstanceResponse = null; + export type GetMappedTaskInstancesData = { dagId: string; dagRunId: string; @@ -4269,6 +4278,31 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; + delete: { + req: DeleteTaskInstanceData; + res: { + /** + * Successful Response + */ + 200: null; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; }; "/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/listMapped": { get: { diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index af2d23014061b..b0da38eb36f7d 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -4006,3 +4006,140 @@ def test_should_return_empty_list_for_updating_same_task_instance_state( ) assert response.status_code == 200 assert response.json() == {"task_instances": [], "total_entries": 0} + + +class TestDeleteTaskInstance(TestTaskInstanceEndpoint): + ENDPOINT_URL = "/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context" + DAG_ID = "example_python_operator" + TASK_ID = "print_the_context" + RUN_ID = "TEST_DAG_RUN_ID" + + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.delete(self.ENDPOINT_URL) + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.delete(self.ENDPOINT_URL) + assert response.status_code == 403 + + @pytest.mark.parametrize( + "test_url, setup_needed, expected_error", + [ + ( + "/dags/non_existent_dag/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context", + False, + "Task Instance id print_the_context not found in dag non_existent_dag run TEST_DAG_RUN_ID", + ), + ( + "/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/non_existent_task", + True, + "Task Instance id non_existent_task not found in dag example_python_operator run TEST_DAG_RUN_ID", + ), + ( + "/dags/example_python_operator/dagRuns/NON_EXISTENT_DAG_RUN/taskInstances/print_the_context", + True, + "Task Instance id print_the_context not found in dag example_python_operator run NON_EXISTENT_DAG_RUN", + ), + ], + ) + def test_should_respond_404_for_non_existent_resources( + self, test_client, session, test_url, setup_needed, expected_error + ): + if setup_needed: + self.create_task_instances(session) + response = test_client.delete(test_url) + assert response.status_code == 404 + assert response.json()["detail"] == expected_error + + @pytest.mark.parametrize( + "task_instances, map_index, expected_status_code, expected_remaining", + [ + pytest.param( + [{"task_id": TASK_ID, "state": State.SUCCESS}], + -1, + 200, + None, + id="normal-success-state", + ), + pytest.param( + [{"task_id": TASK_ID, "state": State.RUNNING}], + -1, + 200, + None, + id="normal-running-state", + ), + pytest.param( + [{"task_id": TASK_ID, "state": State.FAILED}], + -1, + 200, + None, + id="normal-failed-state", + ), + pytest.param( + [ + {"task_id": TASK_ID, "map_index": 1}, + {"task_id": TASK_ID, "map_index": 2}, + {"task_id": TASK_ID, "map_index": 3}, + ], + 2, + 200, + {1, 3}, + id="mapped-task-deletion", + ), + pytest.param( + [{"task_id": TASK_ID}], + 1, + 404, + set(), + id="non-mapped-task-with-map-index", + ), + ], + ) + def test_should_handle_task_instance_deletion( + self, + test_client, + session, + task_instances, + map_index, + expected_status_code, + expected_remaining, + ): + self.create_task_instances(session, task_instances=task_instances) + + base_query = session.query(TaskInstance).filter( + TaskInstance.dag_id == self.DAG_ID, + TaskInstance.task_id == self.TASK_ID, + TaskInstance.run_id == self.RUN_ID, + ) + + if map_index == -1: + initial_ti = base_query.filter(TaskInstance.map_index == -1).first() + assert initial_ti is not None + else: + initial_tis = base_query.filter(TaskInstance.map_index != -1).all() + if any(isinstance(ti, dict) and "map_index" in ti for ti in task_instances): + expected_map_indexes = {ti["map_index"] for ti in task_instances if "map_index" in ti} + actual_map_indexes = {ti.map_index for ti in initial_tis} + assert actual_map_indexes == expected_map_indexes + else: + assert len(initial_tis) == 0 + + response = test_client.delete( + self.ENDPOINT_URL, + params={"map_index": map_index} if map_index != -1 else None, + ) + assert response.status_code == expected_status_code + + if expected_status_code == 404: + assert ( + response.json()["detail"] + == f"Task Instance id {self.TASK_ID} not found in dag {self.DAG_ID} run {self.RUN_ID}" + ) + else: + if map_index == -1: + deleted_ti = base_query.filter(TaskInstance.map_index == -1).first() + assert deleted_ti is None + else: + remaining_tis = base_query.filter(TaskInstance.map_index != -1).all() + if expected_remaining is not None: + assert set(ti.map_index for ti in remaining_tis) == expected_remaining From c67157137ca673842798fbb5543c7ef960cad799 Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Mon, 5 May 2025 22:43:25 +0800 Subject: [PATCH 2/2] Fix error message --- .../core_api/routes/public/task_instances.py | 27 +++++++---------- .../routes/public/test_task_instances.py | 30 +++++++++---------- 2 files changed, 24 insertions(+), 33 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index da436a7d71339..fa2387f63b947 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -240,16 +240,14 @@ def get_task_instance_dependencies( ) -> TaskDependencyCollectionResponse: """Get dependencies blocking task from getting scheduled.""" query = select(TI).where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id) - - if map_index == -1: - query = query.where(TI.map_index == -1) - else: - query = query.where(TI.map_index == map_index) + query = query.where(TI.map_index == map_index) result = session.execute(query).one_or_none() if result is None: - error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}" + error_message = ( + f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and map_index: `{map_index}` was not found", + ) raise HTTPException(status.HTTP_404_NOT_FOUND, error_message) ti = result[0] @@ -754,10 +752,7 @@ def _patch_ti_validate_request( .join(TI.dag_run) .options(joinedload(TI.rendered_task_instance_fields)) ) - if map_index == -1: - query = query.where(or_(TI.map_index == -1, TI.map_index is None)) - else: - query = query.where(TI.map_index == map_index) + query = query.where(TI.map_index == map_index) try: ti = session.scalar(query) @@ -767,7 +762,9 @@ def _patch_ti_validate_request( "Multiple task instances found. As the TI is mapped, add the map_index value to the URL", ) - err_msg_404 = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}" + err_msg_404 = ( + f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and map_index: `{map_index}` was not found", + ) if ti is None: raise HTTPException(status.HTTP_404_NOT_FOUND, err_msg_404) @@ -949,16 +946,12 @@ def delete_task_instance( TI.task_id == task_id, ) - if map_index >= 0: - query = query.where(TI.map_index == map_index) - else: - query = query.where(TI.map_index == -1) - + query = query.where(TI.map_index == map_index) task_instance = session.scalar(query) if task_instance is None: raise HTTPException( status.HTTP_404_NOT_FOUND, - f"Task Instance id {task_id} not found in dag {dag_id} run {dag_run_id}", + f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and map_index: `{map_index}` was not found", ) session.delete(task_instance) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index b0da38eb36f7d..f75d8cfdab2be 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -3215,10 +3215,9 @@ def test_should_respond_403(self, unauthorized_test_client): "error, code, payload", [ [ - ( - "Task Instance not found for dag_id=example_python_operator" - ", run_id=TEST_DAG_RUN_ID, task_id=print_the_context" - ), + [ + "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context` and map_index: `-1` was not found", + ], 404, { "new_state": "failed", @@ -3784,10 +3783,9 @@ def test_should_not_update_mapped_task_instance(self, test_client, session): "error, code, payload", [ [ - ( - "Task Instance not found for dag_id=example_python_operator" - ", run_id=TEST_DAG_RUN_ID, task_id=print_the_context" - ), + [ + "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context` and map_index: `-1` was not found" + ], 404, { "new_state": "failed", @@ -4009,10 +4007,10 @@ def test_should_return_empty_list_for_updating_same_task_instance_state( class TestDeleteTaskInstance(TestTaskInstanceEndpoint): - ENDPOINT_URL = "/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context" DAG_ID = "example_python_operator" TASK_ID = "print_the_context" RUN_ID = "TEST_DAG_RUN_ID" + ENDPOINT_URL = f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}" def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.delete(self.ENDPOINT_URL) @@ -4026,19 +4024,19 @@ def test_should_respond_403(self, unauthorized_test_client): "test_url, setup_needed, expected_error", [ ( - "/dags/non_existent_dag/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context", + f"/dags/non_existent_dag/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}", False, - "Task Instance id print_the_context not found in dag non_existent_dag run TEST_DAG_RUN_ID", + "The Task Instance with dag_id: `non_existent_dag`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context` and map_index: `-1` was not found", ), ( - "/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/non_existent_task", + f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/non_existent_task", True, - "Task Instance id non_existent_task not found in dag example_python_operator run TEST_DAG_RUN_ID", + "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `non_existent_task` and map_index: `-1` was not found", ), ( - "/dags/example_python_operator/dagRuns/NON_EXISTENT_DAG_RUN/taskInstances/print_the_context", + f"/dags/{DAG_ID}/dagRuns/NON_EXISTENT_DAG_RUN/taskInstances/{TASK_ID}", True, - "Task Instance id print_the_context not found in dag example_python_operator run NON_EXISTENT_DAG_RUN", + "The Task Instance with dag_id: `example_python_operator`, run_id: `NON_EXISTENT_DAG_RUN`, task_id: `print_the_context` and map_index: `-1` was not found", ), ], ) @@ -4133,7 +4131,7 @@ def test_should_handle_task_instance_deletion( if expected_status_code == 404: assert ( response.json()["detail"] - == f"Task Instance id {self.TASK_ID} not found in dag {self.DAG_ID} run {self.RUN_ID}" + == f"The Task Instance with dag_id: `{self.DAG_ID}`, run_id: `{self.RUN_ID}`, task_id: `{self.TASK_ID}` and map_index: `{map_index}` was not found" ) else: if map_index == -1: