Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion airflow/api_fastapi/execution_api/routes/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
from typing import Annotated
from uuid import UUID

from fastapi import Body, HTTPException, status
from fastapi import Body, Depends, HTTPException, status
from pydantic import JsonValue
from sqlalchemy import func, update
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
from sqlalchemy.sql import select

from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.security import requires_access_dag
from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
PrevSuccessfulDagRunResponse,
TIDeferredStatePayload,
Expand Down Expand Up @@ -67,6 +69,7 @@
status.HTTP_422_UNPROCESSABLE_ENTITY: {"description": "Invalid payload for the state transition"},
},
response_model_exclude_unset=True,
dependencies=[Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.TASK_INSTANCE))],
)
def ti_run(
task_instance_id: UUID, ti_run_payload: Annotated[TIEnterRunningPayload, Body()], session: SessionDep
Expand Down Expand Up @@ -247,6 +250,7 @@ def ti_run(
status.HTTP_409_CONFLICT: {"description": "The TI is already in the requested state"},
status.HTTP_422_UNPROCESSABLE_ENTITY: {"description": "Invalid payload for the state transition"},
},
dependencies=[Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.TASK_INSTANCE))],
)
def ti_update_state(
task_instance_id: UUID,
Expand Down Expand Up @@ -401,6 +405,7 @@ def ti_update_state(
},
status.HTTP_422_UNPROCESSABLE_ENTITY: {"description": "Invalid payload for the state transition"},
},
dependencies=[Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.TASK_INSTANCE))],
)
def ti_heartbeat(
task_instance_id: UUID,
Expand Down Expand Up @@ -465,6 +470,7 @@ def ti_heartbeat(
"description": "Invalid payload for the setting rendered task instance fields"
},
},
dependencies=[Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.TASK_INSTANCE))],
)
def ti_put_rtif(
task_instance_id: UUID,
Expand All @@ -489,6 +495,7 @@ def ti_put_rtif(
responses={
status.HTTP_404_NOT_FOUND: {"description": "Task Instance or Dag Run not found"},
},
dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.TASK_INSTANCE))],
)
def get_previous_successful_dagrun(
task_instance_id: UUID, session: SessionDep
Expand Down Expand Up @@ -534,6 +541,7 @@ def get_previous_successful_dagrun(
"description": "Invalid payload for requested runtime checks on the Task Instance."
},
},
dependencies=[Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.TASK_INSTANCE))],
)
def ti_runtime_checks(
task_instance_id: UUID,
Expand Down
Loading