diff --git a/airflow/api_connexion/endpoints/task_endpoint.py b/airflow/api_connexion/endpoints/task_endpoint.py index abc28cfee6fbb..3fd14c8cdf892 100644 --- a/airflow/api_connexion/endpoints/task_endpoint.py +++ b/airflow/api_connexion/endpoints/task_endpoint.py @@ -47,6 +47,7 @@ def get_task(*, dag_id: str, task_id: str) -> APIResponse: return task_schema.dump(task) +@mark_fastapi_migration_done @security.requires_access_dag("GET", DagAccessEntity.TASK) def get_tasks(*, dag_id: str, order_by: str = "task_id") -> APIResponse: """Get tasks for DAG.""" diff --git a/airflow/api_fastapi/common/types.py b/airflow/api_fastapi/common/types.py index 2dc1be7d4cf0b..ab10a21c97002 100644 --- a/airflow/api_fastapi/common/types.py +++ b/airflow/api_fastapi/common/types.py @@ -16,14 +16,11 @@ # under the License. from __future__ import annotations -import inspect from datetime import timedelta from typing import Annotated from pydantic import AfterValidator, AliasGenerator, AwareDatetime, BaseModel, BeforeValidator, ConfigDict -from airflow.models.mappedoperator import MappedOperator -from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.utils import timezone UtcDateTime = Annotated[AwareDatetime, AfterValidator(lambda d: d.astimezone(timezone.utc))] @@ -59,28 +56,3 @@ class TimeDelta(BaseModel): TimeDeltaWithValidation = Annotated[TimeDelta, BeforeValidator(_validate_timedelta_field)] - - -def get_class_ref(obj) -> dict[str, str | None]: - """Return the class_ref dict for obj.""" - is_mapped_or_serialized = isinstance(obj, (MappedOperator, SerializedBaseOperator)) - - module_path = None - if is_mapped_or_serialized: - module_path = obj._task_module - else: - module_type = inspect.getmodule(obj) - module_path = module_type.__name__ if module_type else None - - class_name = None - if is_mapped_or_serialized: - class_name = obj._task_type - elif obj.__class__ is type: - class_name = obj.__name__ - else: - class_name = type(obj).__name__ - - return { - "module_path": module_path, - "class_name": class_name, - } diff --git a/airflow/api_fastapi/core_api/datamodels/tasks.py b/airflow/api_fastapi/core_api/datamodels/tasks.py index 7caaf9c02f473..9b962390cc342 100644 --- a/airflow/api_fastapi/core_api/datamodels/tasks.py +++ b/airflow/api_fastapi/core_api/datamodels/tasks.py @@ -17,16 +17,44 @@ from __future__ import annotations +import inspect from collections import abc from datetime import datetime +from typing import Any -from pydantic import BaseModel, computed_field, field_validator +from pydantic import BaseModel, computed_field, field_validator, model_validator from airflow.api_fastapi.common.types import TimeDeltaWithValidation -from airflow.serialization.serialized_objects import encode_priority_weight_strategy +from airflow.models.mappedoperator import MappedOperator +from airflow.serialization.serialized_objects import SerializedBaseOperator, encode_priority_weight_strategy from airflow.task.priority_strategy import PriorityWeightStrategy +def _get_class_ref(obj) -> dict[str, str | None]: + """Return the class_ref dict for obj.""" + is_mapped_or_serialized = isinstance(obj, (MappedOperator, SerializedBaseOperator)) + + module_path = None + if is_mapped_or_serialized: + module_path = obj._task_module + else: + module_type = inspect.getmodule(obj) + module_path = module_type.__name__ if module_type else None + + class_name = None + if is_mapped_or_serialized: + class_name = obj._task_type + elif obj.__class__ is type: + class_name = obj.__name__ + else: + class_name = type(obj).__name__ + + return { + "module_path": module_path, + "class_name": class_name, + } + + class TaskResponse(BaseModel): """Task serializer for responses.""" @@ -57,6 +85,14 @@ class TaskResponse(BaseModel): class_ref: dict | None is_mapped: bool | None + @model_validator(mode="before") + @classmethod + def validate_model(cls, task: Any) -> Any: + task.__dict__.update( + {"class_ref": _get_class_ref(task), "is_mapped": isinstance(task, MappedOperator)} + ) + return task + @field_validator("weight_rule", mode="before") @classmethod def validate_weight_rule(cls, wr: str | PriorityWeightStrategy | None) -> str | None: @@ -81,3 +117,10 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None: def extra_links(self) -> list[str]: """Extract and return extra_links.""" return getattr(self, "operator_extra_links", []) + + +class TaskCollectionResponse(BaseModel): + """Task collection serializer for responses.""" + + tasks: list[TaskResponse] + total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 890fb6b6c8d0a..ab7e36065e3bf 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -3145,6 +3145,64 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/tasks/: + get: + tags: + - Task + summary: Get Tasks + description: Get tasks for DAG. + operationId: get_tasks + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: order_by + in: query + required: false + schema: + type: string + default: task_id + title: Order By + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TaskCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/tasks/{task_id}: get: tags: @@ -5478,6 +5536,22 @@ components: - latest_scheduler_heartbeat title: SchedulerInfoSchema description: Schema for Scheduler info. + TaskCollectionResponse: + properties: + tasks: + items: + $ref: '#/components/schemas/TaskResponse' + type: array + title: Tasks + total_entries: + type: integer + title: Total Entries + type: object + required: + - tasks + - total_entries + title: TaskCollectionResponse + description: Task collection serializer for responses. TaskDependencyCollectionResponse: properties: dependencies: diff --git a/airflow/api_fastapi/core_api/routes/public/tasks.py b/airflow/api_fastapi/core_api/routes/public/tasks.py index 574d2fc7b782d..a8a366cf6df00 100644 --- a/airflow/api_fastapi/core_api/routes/public/tasks.py +++ b/airflow/api_fastapi/core_api/routes/public/tasks.py @@ -17,26 +17,52 @@ from __future__ import annotations +from operator import attrgetter + from fastapi import HTTPException, Request, status from airflow.api_fastapi.common.router import AirflowRouter -from airflow.api_fastapi.common.types import get_class_ref -from airflow.api_fastapi.core_api.datamodels.tasks import TaskResponse +from airflow.api_fastapi.core_api.datamodels.tasks import TaskCollectionResponse, TaskResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.exceptions import TaskNotFound from airflow.models import DAG -from airflow.models.mappedoperator import MappedOperator tasks_router = AirflowRouter(tags=["Task"], prefix="/dags/{dag_id}/tasks") +@tasks_router.get( + "/", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_tasks( + dag_id: str, + request: Request, + order_by: str = "task_id", +) -> TaskCollectionResponse: + """Get tasks for DAG.""" + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + if not dag: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") + try: + tasks = sorted(dag.tasks, key=attrgetter(order_by.lstrip("-")), reverse=(order_by[0:1] == "-")) + except AttributeError as err: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(err)) + return TaskCollectionResponse( + tasks=[TaskResponse.model_validate(task, from_attributes=True) for task in tasks], + total_entries=(len(tasks)), + ) + + @tasks_router.get( "/{task_id}", responses=create_openapi_http_exception_doc( [ status.HTTP_400_BAD_REQUEST, - status.HTTP_401_UNAUTHORIZED, - status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND, ] ), @@ -48,9 +74,6 @@ def get_task(dag_id: str, task_id, request: Request) -> TaskResponse: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") try: task = dag.get_task(task_id=task_id) - task.__dict__.update( - {"class_ref": get_class_ref(task), "is_mapped": isinstance(task, MappedOperator)} - ) except TaskNotFound: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Task with id {task_id} was not found") return TaskResponse.model_validate(task, from_attributes=True) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 7758a94a410ea..bc9d95f87b966 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -931,6 +931,24 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ( }, ]), ]; +export type TaskServiceGetTasksDefaultResponse = Awaited< + ReturnType +>; +export type TaskServiceGetTasksQueryResult< + TData = TaskServiceGetTasksDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useTaskServiceGetTasksKey = "TaskServiceGetTasks"; +export const UseTaskServiceGetTasksKeyFn = ( + { + dagId, + orderBy, + }: { + dagId: string; + orderBy?: string; + }, + queryKey?: Array, +) => [useTaskServiceGetTasksKey, ...(queryKey ?? [{ dagId, orderBy }])]; export type TaskServiceGetTaskDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index d690d87a1b2ef..ac1cd93db37c8 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1264,6 +1264,29 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = ( updatedAtLte, }), }); +/** + * Get Tasks + * Get tasks for DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.orderBy + * @returns TaskCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseTaskServiceGetTasks = ( + queryClient: QueryClient, + { + dagId, + orderBy, + }: { + dagId: string; + orderBy?: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseTaskServiceGetTasksKeyFn({ dagId, orderBy }), + queryFn: () => TaskService.getTasks({ dagId, orderBy }), + }); /** * Get Task * Get simplified representation of a task. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 3b57095183abd..55d758b063846 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1507,6 +1507,35 @@ export const useTaskInstanceServiceGetTaskInstances = < }) as TData, ...options, }); +/** + * Get Tasks + * Get tasks for DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.orderBy + * @returns TaskCollectionResponse Successful Response + * @throws ApiError + */ +export const useTaskServiceGetTasks = < + TData = Common.TaskServiceGetTasksDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + orderBy, + }: { + dagId: string; + orderBy?: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseTaskServiceGetTasksKeyFn({ dagId, orderBy }, queryKey), + queryFn: () => TaskService.getTasks({ dagId, orderBy }) as TData, + ...options, + }); /** * Get Task * Get simplified representation of a task. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 9a1d7a81503e5..0c162cc42cd58 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1492,6 +1492,35 @@ export const useTaskInstanceServiceGetTaskInstancesSuspense = < }) as TData, ...options, }); +/** + * Get Tasks + * Get tasks for DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.orderBy + * @returns TaskCollectionResponse Successful Response + * @throws ApiError + */ +export const useTaskServiceGetTasksSuspense = < + TData = Common.TaskServiceGetTasksDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + orderBy, + }: { + dagId: string; + orderBy?: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseTaskServiceGetTasksKeyFn({ dagId, orderBy }, queryKey), + queryFn: () => TaskService.getTasks({ dagId, orderBy }) as TData, + ...options, + }); /** * Get Task * Get simplified representation of a task. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 8a8a50bb7437a..6c506596baf86 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2882,6 +2882,26 @@ export const $SchedulerInfoSchema = { description: "Schema for Scheduler info.", } as const; +export const $TaskCollectionResponse = { + properties: { + tasks: { + items: { + $ref: "#/components/schemas/TaskResponse", + }, + type: "array", + title: "Tasks", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["tasks", "total_entries"], + title: "TaskCollectionResponse", + description: "Task collection serializer for responses.", +} as const; + export const $TaskDependencyCollectionResponse = { properties: { dependencies: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 2cd106a8e0869..a4b5745056ad9 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -99,6 +99,8 @@ import type { GetMappedTaskInstanceResponse, GetTaskInstancesData, GetTaskInstancesResponse, + GetTasksData, + GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, @@ -1667,6 +1669,37 @@ export class TaskInstanceService { } export class TaskService { + /** + * Get Tasks + * Get tasks for DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.orderBy + * @returns TaskCollectionResponse Successful Response + * @throws ApiError + */ + public static getTasks( + data: GetTasksData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/tasks/", + path: { + dag_id: data.dagId, + }, + query: { + order_by: data.orderBy, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + /** * Get Task * Get simplified representation of a task. diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 35848f3a88f97..53a7f3cef7f8c 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -710,6 +710,14 @@ export type SchedulerInfoSchema = { latest_scheduler_heartbeat: string | null; }; +/** + * Task collection serializer for responses. + */ +export type TaskCollectionResponse = { + tasks: Array; + total_entries: number; +}; + /** * Task scheduling dependencies collection serializer for responses. */ @@ -1375,6 +1383,13 @@ export type GetTaskInstancesData = { export type GetTaskInstancesResponse = TaskInstanceCollectionResponse; +export type GetTasksData = { + dagId: string; + orderBy?: string; +}; + +export type GetTasksResponse = TaskCollectionResponse; + export type GetTaskData = { dagId: string; taskId: unknown; @@ -2703,6 +2718,37 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/tasks/": { + get: { + req: GetTasksData; + res: { + /** + * Successful Response + */ + 200: TaskCollectionResponse; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dags/{dag_id}/tasks/{task_id}": { get: { req: GetTaskData; diff --git a/tests/api_fastapi/core_api/routes/public/test_tasks.py b/tests/api_fastapi/core_api/routes/public/test_tasks.py index 44d5b98490675..a6bff8c9f356b 100644 --- a/tests/api_fastapi/core_api/routes/public/test_tasks.py +++ b/tests/api_fastapi/core_api/routes/public/test_tasks.py @@ -45,6 +45,7 @@ class TestTaskEndpoint: unscheduled_task_id2 = "unscheduled_task_2" task1_start_date = datetime(2020, 6, 15) task2_start_date = datetime(2020, 6, 16) + api_prefix = "/public/dags" def create_dags(self, test_client): with DAG(self.dag_id, schedule=None, start_date=self.task1_start_date, doc_md="details") as dag: @@ -128,7 +129,7 @@ def test_should_respond_200(self, test_client): "doc_md": None, } response = test_client.get( - f"/public/dags/{self.dag_id}/tasks/{self.task_id}", + f"{self.api_prefix}/{self.dag_id}/tasks/{self.task_id}", ) assert response.status_code == 200 assert response.json() == expected @@ -164,7 +165,7 @@ def test_mapped_task(self, test_client): "doc_md": None, } response = test_client.get( - f"/public/dags/{self.mapped_dag_id}/tasks/{self.mapped_task_id}", + f"{self.api_prefix}/{self.mapped_dag_id}/tasks/{self.mapped_task_id}", ) assert response.status_code == 200 assert response.json() == expected @@ -215,7 +216,7 @@ def test_unscheduled_task(self, test_client): } for task_id, downstream_task_id in downstream_dict.items(): response = test_client.get( - f"/public/dags/{self.unscheduled_dag_id}/tasks/{task_id}", + f"{self.api_prefix}/{self.unscheduled_dag_id}/tasks/{task_id}", ) assert response.status_code == 200 expected["downstream_task_ids"] = [downstream_task_id] if downstream_task_id else [] @@ -273,7 +274,7 @@ def test_should_respond_200_serialized(self, test_client): "doc_md": None, } response = test_client.get( - f"/public/dags/{self.dag_id}/tasks/{self.task_id}", + f"{self.api_prefix}/{self.dag_id}/tasks/{self.task_id}", ) assert response.status_code == 200 assert response.json() == expected @@ -282,13 +283,254 @@ def test_should_respond_200_serialized(self, test_client): def test_should_respond_404(self, test_client): task_id = "xxxx_not_existing" response = test_client.get( - f"/public/dags/{self.dag_id}/tasks/{task_id}", + f"{self.api_prefix}/{self.dag_id}/tasks/{task_id}", ) assert response.status_code == 404 def test_should_respond_404_when_dag_not_found(self, test_client): dag_id = "xxxx_not_existing" response = test_client.get( - f"/public/dags/{dag_id}/tasks/{self.task_id}", + f"{self.api_prefix}/{dag_id}/tasks/{self.task_id}", ) assert response.status_code == 404 + + +class TestGetTasks(TestTaskEndpoint): + def test_should_respond_200(self, test_client): + expected = { + "tasks": [ + { + "class_ref": { + "class_name": "EmptyOperator", + "module_path": "airflow.operators.empty", + }, + "depends_on_past": False, + "downstream_task_ids": [self.task_id2], + "end_date": None, + "execution_timeout": None, + "extra_links": [], + "operator_name": "EmptyOperator", + "owner": "airflow", + "params": { + "foo": { + "__class": "airflow.models.param.Param", + "value": "bar", + "description": None, + "schema": {}, + } + }, + "pool": "default_pool", + "pool_slots": 1.0, + "priority_weight": 1.0, + "queue": "default", + "retries": 0.0, + "retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0}, + "retry_exponential_backoff": False, + "start_date": "2020-06-15T00:00:00Z", + "task_id": "op1", + "task_display_name": "op1", + "template_fields": [], + "trigger_rule": "all_success", + "ui_color": "#e8f7e4", + "ui_fgcolor": "#000", + "wait_for_downstream": False, + "weight_rule": "downstream", + "is_mapped": False, + "doc_md": None, + }, + { + "class_ref": { + "class_name": "EmptyOperator", + "module_path": "airflow.operators.empty", + }, + "depends_on_past": False, + "downstream_task_ids": [], + "end_date": None, + "execution_timeout": None, + "extra_links": [], + "operator_name": "EmptyOperator", + "owner": "airflow", + "params": {}, + "pool": "default_pool", + "pool_slots": 1.0, + "priority_weight": 1.0, + "queue": "default", + "retries": 0.0, + "retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0}, + "retry_exponential_backoff": False, + "start_date": "2020-06-16T00:00:00Z", + "task_id": self.task_id2, + "task_display_name": self.task_id2, + "template_fields": [], + "trigger_rule": "all_success", + "ui_color": "#e8f7e4", + "ui_fgcolor": "#000", + "wait_for_downstream": False, + "weight_rule": "downstream", + "is_mapped": False, + "doc_md": None, + }, + ], + "total_entries": 2, + } + response = test_client.get(f"{self.api_prefix}/{self.dag_id}/tasks") + assert response.status_code == 200 + assert response.json() == expected + + def test_get_tasks_mapped(self, test_client): + expected = { + "tasks": [ + { + "class_ref": {"class_name": "EmptyOperator", "module_path": "airflow.operators.empty"}, + "depends_on_past": False, + "downstream_task_ids": [], + "end_date": None, + "execution_timeout": None, + "extra_links": [], + "is_mapped": True, + "operator_name": "EmptyOperator", + "owner": "airflow", + "params": {}, + "pool": "default_pool", + "pool_slots": 1.0, + "priority_weight": 1.0, + "queue": "default", + "retries": 0.0, + "retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300}, + "retry_exponential_backoff": False, + "start_date": "2020-06-15T00:00:00Z", + "task_id": "mapped_task", + "task_display_name": "mapped_task", + "template_fields": [], + "trigger_rule": "all_success", + "ui_color": "#e8f7e4", + "ui_fgcolor": "#000", + "wait_for_downstream": False, + "weight_rule": "downstream", + "doc_md": None, + }, + { + "class_ref": { + "class_name": "EmptyOperator", + "module_path": "airflow.operators.empty", + }, + "depends_on_past": False, + "downstream_task_ids": [], + "end_date": None, + "execution_timeout": None, + "extra_links": [], + "operator_name": "EmptyOperator", + "owner": "airflow", + "params": {}, + "pool": "default_pool", + "pool_slots": 1.0, + "priority_weight": 1.0, + "queue": "default", + "retries": 0.0, + "retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0}, + "retry_exponential_backoff": False, + "start_date": "2020-06-15T00:00:00Z", + "task_id": self.task_id3, + "task_display_name": self.task_id3, + "template_fields": [], + "trigger_rule": "all_success", + "ui_color": "#e8f7e4", + "ui_fgcolor": "#000", + "wait_for_downstream": False, + "weight_rule": "downstream", + "is_mapped": False, + "doc_md": None, + }, + ], + "total_entries": 2, + } + response = test_client.get(f"{self.api_prefix}/{self.mapped_dag_id}/tasks") + assert response.status_code == 200 + assert response.json() == expected + + def test_get_unscheduled_tasks(self, test_client): + downstream_dict = { + self.unscheduled_task_id1: self.unscheduled_task_id2, + self.unscheduled_task_id2: None, + } + expected = { + "tasks": [ + { + "class_ref": { + "class_name": "EmptyOperator", + "module_path": "airflow.operators.empty", + }, + "depends_on_past": False, + "downstream_task_ids": [downstream_task_id] if downstream_task_id else [], + "end_date": None, + "execution_timeout": None, + "extra_links": [], + "operator_name": "EmptyOperator", + "owner": "airflow", + "params": { + "is_unscheduled": { + "__class": "airflow.models.param.Param", + "value": True, + "description": None, + "schema": {}, + } + }, + "pool": "default_pool", + "pool_slots": 1.0, + "priority_weight": 1.0, + "queue": "default", + "retries": 0.0, + "retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0}, + "retry_exponential_backoff": False, + "start_date": None, + "task_id": task_id, + "task_display_name": task_id, + "template_fields": [], + "trigger_rule": "all_success", + "ui_color": "#e8f7e4", + "ui_fgcolor": "#000", + "wait_for_downstream": False, + "weight_rule": "downstream", + "is_mapped": False, + "doc_md": None, + } + for (task_id, downstream_task_id) in downstream_dict.items() + ], + "total_entries": len(downstream_dict), + } + response = test_client.get(f"{self.api_prefix}/{self.unscheduled_dag_id}/tasks") + assert response.status_code == 200 + assert response.json() == expected + + def test_should_respond_200_ascending_order_by_start_date(self, test_client): + response = test_client.get( + f"{self.api_prefix}/{self.dag_id}/tasks?order_by=start_date", + ) + assert response.status_code == 200 + assert self.task1_start_date < self.task2_start_date + assert response.json()["tasks"][0]["task_id"] == self.task_id + assert response.json()["tasks"][1]["task_id"] == self.task_id2 + + def test_should_respond_200_descending_order_by_start_date(self, test_client): + response = test_client.get( + f"{self.api_prefix}/{self.dag_id}/tasks?order_by=-start_date", + ) + assert response.status_code == 200 + # - means is descending + assert self.task1_start_date < self.task2_start_date + assert response.json()["tasks"][0]["task_id"] == self.task_id2 + assert response.json()["tasks"][1]["task_id"] == self.task_id + + def test_should_raise_400_for_invalid_order_by_name(self, test_client): + response = test_client.get( + f"{self.api_prefix}/{self.dag_id}/tasks?order_by=invalid_task_colume_name", + ) + assert response.status_code == 400 + assert ( + response.json()["detail"] == "'EmptyOperator' object has no attribute 'invalid_task_colume_name'" + ) + + def test_should_respond_404(self, test_client): + dag_id = "xxxx_not_existing" + response = test_client.get(f"{self.api_prefix}/{dag_id}/tasks") + assert response.status_code == 404