diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py new file mode 100644 index 0000000000000..379d780290938 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from pydantic import Field + +from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel + + +class AssetStateResponse(BaseModel): + """A single asset state key/value pair with metadata.""" + + key: str + value: str + updated_at: datetime + + +class AssetStateCollectionResponse(BaseModel): + """All asset state entries for an asset.""" + + asset_states: list[AssetStateResponse] + total_entries: int + + +class AssetStateBody(StrictBaseModel): + """Request body for setting an asset state value.""" + + value: str = Field(max_length=65535) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py new file mode 100644 index 0000000000000..856de74a0877b --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from pydantic import Field + +from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel + + +class TaskStateResponse(BaseModel): + """A single task state key/value pair with metadata.""" + + key: str + value: str + updated_at: datetime + expires_at: datetime | None + + +class TaskStateCollectionResponse(BaseModel): + """All task state entries for a task instance.""" + + task_states: list[TaskStateResponse] + total_entries: int + + +class TaskStateBody(StrictBaseModel): + """Request body for setting a task state value.""" + + value: str = Field(max_length=65535) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5ed96855c24ac..c647e4e5ad73e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -5529,6 +5529,649 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /api/v2/assets/{asset_id}/states: + get: + tags: + - Asset State + summary: List Asset States + description: List all state entries for an asset. + operationId: list_asset_states + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: asset_id + in: path + required: true + schema: + type: integer + title: Asset Id + - name: limit + in: query + required: false + schema: + type: integer + minimum: 0 + default: 50 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + default: 0 + title: Offset + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/AssetStateCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + delete: + tags: + - Asset State + summary: Clear Asset State + description: Delete all state keys for an asset. + operationId: clear_asset_state + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: asset_id + in: path + required: true + schema: + type: integer + title: Asset Id + responses: + '204': + description: Successful Response + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /api/v2/assets/{asset_id}/states/{key}: + get: + tags: + - Asset State + summary: Get Asset State + description: Get a single asset state entry. + operationId: get_asset_state + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: key + in: path + required: true + schema: + type: string + title: Key + - name: asset_id + in: path + required: true + schema: + type: integer + title: Asset Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/AssetStateResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + put: + tags: + - Asset State + summary: Set Asset State + description: Set an asset state value. Creates or overwrites the key. + operationId: set_asset_state + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: key + in: path + required: true + schema: + type: string + title: Key + - name: asset_id + in: path + required: true + schema: + type: integer + title: Asset Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/AssetStateBody' + responses: + '204': + description: Successful Response + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + delete: + tags: + - Asset State + summary: Delete Asset State + description: Delete a single asset state key. No-op if the key does not exist. + operationId: delete_asset_state + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: key + in: path + required: true + schema: + type: string + title: Key + - name: asset_id + in: path + required: true + schema: + type: integer + title: Asset Id + responses: + '204': + description: Successful Response + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states: + get: + tags: + - Task State + summary: List Task States + description: List all task state entries for a task instance. + operationId: list_task_states + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: map_index + in: query + required: false + schema: + type: integer + minimum: -1 + default: -1 + title: Map Index + - name: limit + in: query + required: false + schema: + type: integer + minimum: 0 + default: 50 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + default: 0 + title: Offset + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TaskStateCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + delete: + tags: + - Task State + summary: Clear Task State + description: 'Delete all task state keys for a task instance. + + + When ``all_map_indices=true``, state is cleared for every map index of the + task and + + the ``map_index`` parameter is ignored.' + operationId: clear_task_state + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: map_index + in: query + required: false + schema: + type: integer + minimum: -1 + default: -1 + title: Map Index + - name: all_map_indices + in: query + required: false + schema: + type: boolean + default: false + title: All Map Indices + responses: + '204': + description: Successful Response + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}: + get: + tags: + - Task State + summary: Get Task State + description: Get a single task state entry. + operationId: get_task_state + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: key + in: path + required: true + schema: + type: string + title: Key + - name: map_index + in: query + required: false + schema: + type: integer + minimum: -1 + default: -1 + title: Map Index + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TaskStateResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + put: + tags: + - Task State + summary: Set Task State + description: Set a task state value. Creates or overwrites the key. + operationId: set_task_state + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: key + in: path + required: true + schema: + type: string + title: Key + - name: map_index + in: query + required: false + schema: + type: integer + minimum: -1 + default: -1 + title: Map Index + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/TaskStateBody' + responses: + '204': + description: Successful Response + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + delete: + tags: + - Task State + summary: Delete Task State + description: Delete a single task state key. No-op if the key does not exist. + operationId: delete_task_state + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: key + in: path + required: true + schema: + type: string + title: Key + - name: map_index + in: query + required: false + schema: + type: integer + minimum: -1 + default: -1 + title: Map Index + responses: + '204': + description: Successful Response + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}: get: tags: @@ -10577,6 +11220,53 @@ components: - watchers title: AssetResponse description: Asset serializer for responses. + AssetStateBody: + properties: + value: + type: string + maxLength: 65535 + title: Value + additionalProperties: false + type: object + required: + - value + title: AssetStateBody + description: Request body for setting an asset state value. + AssetStateCollectionResponse: + properties: + asset_states: + items: + $ref: '#/components/schemas/AssetStateResponse' + type: array + title: Asset States + total_entries: + type: integer + title: Total Entries + type: object + required: + - asset_states + - total_entries + title: AssetStateCollectionResponse + description: All asset state entries for an asset. + AssetStateResponse: + properties: + key: + type: string + title: Key + value: + type: string + title: Value + updated_at: + type: string + format: date-time + title: Updated At + type: object + required: + - key + - value + - updated_at + title: AssetStateResponse + description: A single asset state key/value pair with metadata. AssetWatcherResponse: properties: name: @@ -14855,6 +15545,60 @@ components: - extra_links title: TaskResponse description: Task serializer for responses. + TaskStateBody: + properties: + value: + type: string + maxLength: 65535 + title: Value + additionalProperties: false + type: object + required: + - value + title: TaskStateBody + description: Request body for setting a task state value. + TaskStateCollectionResponse: + properties: + task_states: + items: + $ref: '#/components/schemas/TaskStateResponse' + type: array + title: Task States + total_entries: + type: integer + title: Total Entries + type: object + required: + - task_states + - total_entries + title: TaskStateCollectionResponse + description: All task state entries for a task instance. + TaskStateResponse: + properties: + key: + type: string + title: Key + value: + type: string + title: Value + updated_at: + type: string + format: date-time + title: Updated At + expires_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Expires At + type: object + required: + - key + - value + - updated_at + - expires_at + title: TaskStateResponse + description: A single task state key/value pair with metadata. TimeDelta: properties: __type: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py index 0b501b4f99f71..3905e29ccbcdf 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py @@ -21,6 +21,7 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.routes.public.asset_state import asset_state_router from airflow.api_fastapi.core_api.routes.public.assets import assets_router from airflow.api_fastapi.core_api.routes.public.auth import auth_router from airflow.api_fastapi.core_api.routes.public.backfills import backfills_router @@ -45,6 +46,7 @@ from airflow.api_fastapi.core_api.routes.public.pools import pools_router from airflow.api_fastapi.core_api.routes.public.providers import providers_router from airflow.api_fastapi.core_api.routes.public.task_instances import task_instances_router +from airflow.api_fastapi.core_api.routes.public.task_state import task_state_router from airflow.api_fastapi.core_api.routes.public.tasks import tasks_router from airflow.api_fastapi.core_api.routes.public.variables import variables_router from airflow.api_fastapi.core_api.routes.public.version import version_router @@ -74,6 +76,8 @@ authenticated_router.include_router(plugins_router) authenticated_router.include_router(pools_router) authenticated_router.include_router(providers_router) +authenticated_router.include_router(asset_state_router) +authenticated_router.include_router(task_state_router) authenticated_router.include_router(xcom_router) authenticated_router.include_router(task_instances_router) authenticated_router.include_router(tasks_router) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py new file mode 100644 index 0000000000000..43580e0c762c7 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py @@ -0,0 +1,163 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Annotated + +from fastapi import Depends, HTTPException, status +from sqlalchemy import select + +from airflow._shared.state import AssetScope +from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.asset_state import ( + AssetStateBody, + AssetStateCollectionResponse, + AssetStateResponse, +) +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.security import requires_access_asset +from airflow.models.asset import AssetModel +from airflow.models.asset_state import AssetStateModel +from airflow.state import get_state_backend + +asset_state_router = AirflowRouter( + tags=["Asset State"], + prefix="/assets/{asset_id}/states", +) + + +def _get_asset_or_404(asset_id: int, session: SessionDep) -> int: + exists = session.scalar(select(AssetModel.id).where(AssetModel.id == asset_id)) + if exists is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Asset with id {asset_id!r} not found", + ) + return asset_id + + +AssetIdDep = Annotated[int, Depends(_get_asset_or_404)] + + +@asset_state_router.get( + "", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="GET"))], +) +def list_asset_states( + asset_id: AssetIdDep, + limit: QueryLimit, + offset: QueryOffset, + session: SessionDep, +) -> AssetStateCollectionResponse: + """List all state entries for an asset.""" + base = ( + select( + AssetStateModel.key, + AssetStateModel.value, + AssetStateModel.updated_at, + ) + .where(AssetStateModel.asset_id == asset_id) + .order_by(AssetStateModel.key.asc()) + ) + paginated, total_entries = paginated_select( + statement=base, + filters=None, + order_by=None, + offset=offset, + limit=limit, + session=session, + ) + rows = session.execute(paginated).all() + entries = [AssetStateResponse(key=r.key, value=r.value, updated_at=r.updated_at) for r in rows] + return AssetStateCollectionResponse(asset_states=entries, total_entries=total_entries) + + +@asset_state_router.get( + "/{key:path}", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="GET"))], +) +def get_asset_state( + asset_id: AssetIdDep, + key: str, + session: SessionDep, +) -> AssetStateResponse: + """Get a single asset state entry.""" + row = session.execute( + select( + AssetStateModel.key, + AssetStateModel.value, + AssetStateModel.updated_at, + ).where( + AssetStateModel.asset_id == asset_id, + AssetStateModel.key == key, + ) + ).one_or_none() + if row is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Asset state key {key!r} not found", + ) + return AssetStateResponse(key=row.key, value=row.value, updated_at=row.updated_at) + + +@asset_state_router.put( + "/{key:path}", + status_code=status.HTTP_204_NO_CONTENT, + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="PUT"))], +) +def set_asset_state( + asset_id: AssetIdDep, + key: str, + body: AssetStateBody, + session: SessionDep, +) -> None: + """Set an asset state value. Creates or overwrites the key.""" + get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, session=session) + + +@asset_state_router.delete( + "/{key:path}", + status_code=status.HTTP_204_NO_CONTENT, + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="DELETE"))], +) +def delete_asset_state( + asset_id: AssetIdDep, + key: str, + session: SessionDep, +) -> None: + """Delete a single asset state key. No-op if the key does not exist.""" + get_state_backend().delete(AssetScope(asset_id=asset_id), key, session=session) + + +@asset_state_router.delete( + "", + status_code=status.HTTP_204_NO_CONTENT, + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="DELETE"))], +) +def clear_asset_state( + asset_id: AssetIdDep, + session: SessionDep, +) -> None: + """Delete all state keys for an asset.""" + get_state_backend().clear(AssetScope(asset_id=asset_id), session=session) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py new file mode 100644 index 0000000000000..138380232a8aa --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py @@ -0,0 +1,210 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Annotated + +from fastapi import Depends, HTTPException, Query, status +from sqlalchemy import select + +from airflow._shared.state import TaskScope +from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity +from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.task_state import ( + TaskStateBody, + TaskStateCollectionResponse, + TaskStateResponse, +) +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.security import requires_access_dag +from airflow.models.task_state import TaskStateModel +from airflow.models.taskinstance import TaskInstance as TI +from airflow.state import get_state_backend + +task_state_router = AirflowRouter( + tags=["Task State"], + prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states", +) + + +def _get_scope(dag_id: str, dag_run_id: str, task_id: str, map_index: int) -> TaskScope: + return TaskScope(dag_id=dag_id, run_id=dag_run_id, task_id=task_id, map_index=map_index) + + +@task_state_router.get( + "", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.TASK_INSTANCE))], +) +def list_task_states( + dag_id: str, + dag_run_id: str, + task_id: str, + limit: QueryLimit, + offset: QueryOffset, + session: SessionDep, + map_index: Annotated[int, Query(ge=-1)] = -1, +) -> TaskStateCollectionResponse: + """List all task state entries for a task instance.""" + base = ( + select( + TaskStateModel.key, + TaskStateModel.value, + TaskStateModel.updated_at, + TaskStateModel.expires_at, + ) + .where( + TaskStateModel.dag_id == dag_id, + TaskStateModel.run_id == dag_run_id, + TaskStateModel.task_id == task_id, + TaskStateModel.map_index == map_index, + ) + .order_by(TaskStateModel.key.asc()) + ) + paginated, total_entries = paginated_select( + statement=base, + filters=None, + order_by=None, + offset=offset, + limit=limit, + session=session, + ) + rows = session.execute(paginated).all() + entries = [ + TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at, expires_at=r.expires_at) + for r in rows + ] + return TaskStateCollectionResponse(task_states=entries, total_entries=total_entries) + + +@task_state_router.get( + "/{key:path}", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.TASK_INSTANCE))], +) +def get_task_state( + dag_id: str, + dag_run_id: str, + task_id: str, + key: str, + session: SessionDep, + map_index: Annotated[int, Query(ge=-1)] = -1, +) -> TaskStateResponse: + """Get a single task state entry.""" + row = session.execute( + select( + TaskStateModel.key, + TaskStateModel.value, + TaskStateModel.updated_at, + TaskStateModel.expires_at, + ).where( + TaskStateModel.dag_id == dag_id, + TaskStateModel.run_id == dag_run_id, + TaskStateModel.task_id == task_id, + TaskStateModel.map_index == map_index, + TaskStateModel.key == key, + ) + ).one_or_none() + if row is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Task state key {key!r} not found", + ) + return TaskStateResponse( + key=row.key, value=row.value, updated_at=row.updated_at, expires_at=row.expires_at + ) + + +@task_state_router.put( + "/{key:path}", + status_code=status.HTTP_204_NO_CONTENT, + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.TASK_INSTANCE))], +) +def set_task_state( + dag_id: str, + dag_run_id: str, + task_id: str, + key: str, + body: TaskStateBody, + session: SessionDep, + map_index: Annotated[int, Query(ge=-1)] = -1, +) -> None: + """Set a task state value. Creates or overwrites the key.""" + ti_exists = session.scalar( + select(TI.task_id).where( + TI.dag_id == dag_id, + TI.run_id == dag_run_id, + TI.task_id == task_id, + TI.map_index == map_index, + ) + ) + if ti_exists is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Task instance not found for dag_id={dag_id!r}, run_id={dag_run_id!r}, task_id={task_id!r}, map_index={map_index}", + ) + scope = _get_scope(dag_id, dag_run_id, task_id, map_index) + try: + get_state_backend().set(scope, key, body.value, session=session) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) from e + + +@task_state_router.delete( + "/{key:path}", + status_code=status.HTTP_204_NO_CONTENT, + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_dag(method="DELETE", access_entity=DagAccessEntity.TASK_INSTANCE))], +) +def delete_task_state( + dag_id: str, + dag_run_id: str, + task_id: str, + key: str, + session: SessionDep, + map_index: Annotated[int, Query(ge=-1)] = -1, +) -> None: + """Delete a single task state key. No-op if the key does not exist.""" + scope = _get_scope(dag_id, dag_run_id, task_id, map_index) + get_state_backend().delete(scope, key, session=session) + + +@task_state_router.delete( + "", + status_code=status.HTTP_204_NO_CONTENT, + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_dag(method="DELETE", access_entity=DagAccessEntity.TASK_INSTANCE))], +) +def clear_task_state( + dag_id: str, + dag_run_id: str, + task_id: str, + session: SessionDep, + map_index: Annotated[int, Query(ge=-1)] = -1, + all_map_indices: Annotated[bool, Query()] = False, +) -> None: + """ + Delete all task state keys for a task instance. + + When ``all_map_indices=true``, state is cleared for every map index of the task and + the ``map_index`` parameter is ignored. + """ + scope = _get_scope(dag_id, dag_run_id, task_id, map_index) + get_state_backend().clear(scope, all_map_indices=all_map_indices, session=session) diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 0e44f9f63bd74..8a950a0af2845 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { UseQueryResult } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AssetStateService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TaskStateService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; export type AssetServiceGetAssetsDefaultResponse = Awaited>; export type AssetServiceGetAssetsQueryResult = UseQueryResult; @@ -723,6 +723,42 @@ export const UseProviderServiceGetProvidersKeyFn = ({ limit, offset }: { limit?: number; offset?: number; } = {}, queryKey?: Array) => [useProviderServiceGetProvidersKey, ...(queryKey ?? [{ limit, offset }])]; +export type AssetStateServiceListAssetStatesDefaultResponse = Awaited>; +export type AssetStateServiceListAssetStatesQueryResult = UseQueryResult; +export const useAssetStateServiceListAssetStatesKey = "AssetStateServiceListAssetStates"; +export const UseAssetStateServiceListAssetStatesKeyFn = ({ assetId, limit, offset }: { + assetId: number; + limit?: number; + offset?: number; +}, queryKey?: Array) => [useAssetStateServiceListAssetStatesKey, ...(queryKey ?? [{ assetId, limit, offset }])]; +export type AssetStateServiceGetAssetStateDefaultResponse = Awaited>; +export type AssetStateServiceGetAssetStateQueryResult = UseQueryResult; +export const useAssetStateServiceGetAssetStateKey = "AssetStateServiceGetAssetState"; +export const UseAssetStateServiceGetAssetStateKeyFn = ({ assetId, key }: { + assetId: number; + key: string; +}, queryKey?: Array) => [useAssetStateServiceGetAssetStateKey, ...(queryKey ?? [{ assetId, key }])]; +export type TaskStateServiceListTaskStatesDefaultResponse = Awaited>; +export type TaskStateServiceListTaskStatesQueryResult = UseQueryResult; +export const useTaskStateServiceListTaskStatesKey = "TaskStateServiceListTaskStates"; +export const UseTaskStateServiceListTaskStatesKeyFn = ({ dagId, dagRunId, limit, mapIndex, offset, taskId }: { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number; + offset?: number; + taskId: string; +}, queryKey?: Array) => [useTaskStateServiceListTaskStatesKey, ...(queryKey ?? [{ dagId, dagRunId, limit, mapIndex, offset, taskId }])]; +export type TaskStateServiceGetTaskStateDefaultResponse = Awaited>; +export type TaskStateServiceGetTaskStateQueryResult = UseQueryResult; +export const useTaskStateServiceGetTaskStateKey = "TaskStateServiceGetTaskState"; +export const UseTaskStateServiceGetTaskStateKeyFn = ({ dagId, dagRunId, key, mapIndex, taskId }: { + dagId: string; + dagRunId: string; + key: string; + mapIndex?: number; + taskId: string; +}, queryKey?: Array) => [useTaskStateServiceGetTaskStateKey, ...(queryKey ?? [{ dagId, dagRunId, key, mapIndex, taskId }])]; export type XcomServiceGetXcomEntryDefaultResponse = Awaited>; export type XcomServiceGetXcomEntryQueryResult = UseQueryResult; export const useXcomServiceGetXcomEntryKey = "XcomServiceGetXcomEntry"; @@ -1008,6 +1044,8 @@ export type AuthLinksServiceGenerateTokenMutationResult = Awaited>; export type BackfillServiceUnpauseBackfillMutationResult = Awaited>; export type BackfillServiceCancelBackfillMutationResult = Awaited>; +export type AssetStateServiceSetAssetStateMutationResult = Awaited>; +export type TaskStateServiceSetTaskStateMutationResult = Awaited>; export type DagParsingServiceReparseDagFileMutationResult = Awaited>; export type ConnectionServicePatchConnectionMutationResult = Awaited>; export type ConnectionServiceBulkConnectionsMutationResult = Awaited>; @@ -1035,5 +1073,9 @@ export type DagRunServiceDeleteDagRunMutationResult = Awaited>; export type TaskInstanceServiceDeleteTaskInstanceMutationResult = Awaited>; export type PoolServiceDeletePoolMutationResult = Awaited>; +export type AssetStateServiceClearAssetStateMutationResult = Awaited>; +export type AssetStateServiceDeleteAssetStateMutationResult = Awaited>; +export type TaskStateServiceClearTaskStateMutationResult = Awaited>; +export type TaskStateServiceDeleteTaskStateMutationResult = Awaited>; export type XcomServiceDeleteXcomEntryMutationResult = Awaited>; export type VariableServiceDeleteVariableMutationResult = Awaited>; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index bf9087edc6597..0dff4bcd7d8c9 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { type QueryClient } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AssetStateService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TaskStateService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -1464,6 +1464,74 @@ export const ensureUseProviderServiceGetProvidersData = (queryClient: QueryClien offset?: number; } = {}) => queryClient.ensureQueryData({ queryKey: Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }), queryFn: () => ProviderService.getProviders({ limit, offset }) }); /** +* List Asset States +* List all state entries for an asset. +* @param data The data for the request. +* @param data.assetId +* @param data.limit +* @param data.offset +* @returns AssetStateCollectionResponse Successful Response +* @throws ApiError +*/ +export const ensureUseAssetStateServiceListAssetStatesData = (queryClient: QueryClient, { assetId, limit, offset }: { + assetId: number; + limit?: number; + offset?: number; +}) => queryClient.ensureQueryData({ queryKey: Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset }), queryFn: () => AssetStateService.listAssetStates({ assetId, limit, offset }) }); +/** +* Get Asset State +* Get a single asset state entry. +* @param data The data for the request. +* @param data.key +* @param data.assetId +* @returns AssetStateResponse Successful Response +* @throws ApiError +*/ +export const ensureUseAssetStateServiceGetAssetStateData = (queryClient: QueryClient, { assetId, key }: { + assetId: number; + key: string; +}) => queryClient.ensureQueryData({ queryKey: Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }), queryFn: () => AssetStateService.getAssetState({ assetId, key }) }); +/** +* List Task States +* List all task state entries for a task instance. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.mapIndex +* @param data.limit +* @param data.offset +* @returns TaskStateCollectionResponse Successful Response +* @throws ApiError +*/ +export const ensureUseTaskStateServiceListTaskStatesData = (queryClient: QueryClient, { dagId, dagRunId, limit, mapIndex, offset, taskId }: { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number; + offset?: number; + taskId: string; +}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId }), queryFn: () => TaskStateService.listTaskStates({ dagId, dagRunId, limit, mapIndex, offset, taskId }) }); +/** +* Get Task State +* Get a single task state entry. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.key +* @param data.mapIndex +* @returns TaskStateResponse Successful Response +* @throws ApiError +*/ +export const ensureUseTaskStateServiceGetTaskStateData = (queryClient: QueryClient, { dagId, dagRunId, key, mapIndex, taskId }: { + dagId: string; + dagRunId: string; + key: string; + mapIndex?: number; + taskId: string; +}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex, taskId }), queryFn: () => TaskStateService.getTaskState({ dagId, dagRunId, key, mapIndex, taskId }) }); +/** * Get Xcom Entry * Get an XCom entry. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 18dc03745657b..65d4226f39e84 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { type QueryClient } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AssetStateService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TaskStateService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -1464,6 +1464,74 @@ export const prefetchUseProviderServiceGetProviders = (queryClient: QueryClient, offset?: number; } = {}) => queryClient.prefetchQuery({ queryKey: Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }), queryFn: () => ProviderService.getProviders({ limit, offset }) }); /** +* List Asset States +* List all state entries for an asset. +* @param data The data for the request. +* @param data.assetId +* @param data.limit +* @param data.offset +* @returns AssetStateCollectionResponse Successful Response +* @throws ApiError +*/ +export const prefetchUseAssetStateServiceListAssetStates = (queryClient: QueryClient, { assetId, limit, offset }: { + assetId: number; + limit?: number; + offset?: number; +}) => queryClient.prefetchQuery({ queryKey: Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset }), queryFn: () => AssetStateService.listAssetStates({ assetId, limit, offset }) }); +/** +* Get Asset State +* Get a single asset state entry. +* @param data The data for the request. +* @param data.key +* @param data.assetId +* @returns AssetStateResponse Successful Response +* @throws ApiError +*/ +export const prefetchUseAssetStateServiceGetAssetState = (queryClient: QueryClient, { assetId, key }: { + assetId: number; + key: string; +}) => queryClient.prefetchQuery({ queryKey: Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }), queryFn: () => AssetStateService.getAssetState({ assetId, key }) }); +/** +* List Task States +* List all task state entries for a task instance. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.mapIndex +* @param data.limit +* @param data.offset +* @returns TaskStateCollectionResponse Successful Response +* @throws ApiError +*/ +export const prefetchUseTaskStateServiceListTaskStates = (queryClient: QueryClient, { dagId, dagRunId, limit, mapIndex, offset, taskId }: { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number; + offset?: number; + taskId: string; +}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId }), queryFn: () => TaskStateService.listTaskStates({ dagId, dagRunId, limit, mapIndex, offset, taskId }) }); +/** +* Get Task State +* Get a single task state entry. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.key +* @param data.mapIndex +* @returns TaskStateResponse Successful Response +* @throws ApiError +*/ +export const prefetchUseTaskStateServiceGetTaskState = (queryClient: QueryClient, { dagId, dagRunId, key, mapIndex, taskId }: { + dagId: string; + dagRunId: string; + key: string; + mapIndex?: number; + taskId: string; +}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex, taskId }), queryFn: () => TaskStateService.getTaskState({ dagId, dagRunId, key, mapIndex, taskId }) }); +/** * Get Xcom Entry * Get an XCom entry. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index dc43dd038afb1..ca750ab326aa4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -1,8 +1,8 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; -import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; +import { AssetService, AssetStateService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TaskStateService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetStateBody, BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TaskStateBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; import * as Common from "./common"; /** * Get Assets @@ -1464,6 +1464,74 @@ export const useProviderServiceGetProviders = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }, queryKey), queryFn: () => ProviderService.getProviders({ limit, offset }) as TData, ...options }); /** +* List Asset States +* List all state entries for an asset. +* @param data The data for the request. +* @param data.assetId +* @param data.limit +* @param data.offset +* @returns AssetStateCollectionResponse Successful Response +* @throws ApiError +*/ +export const useAssetStateServiceListAssetStates = = unknown[]>({ assetId, limit, offset }: { + assetId: number; + limit?: number; + offset?: number; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset }, queryKey), queryFn: () => AssetStateService.listAssetStates({ assetId, limit, offset }) as TData, ...options }); +/** +* Get Asset State +* Get a single asset state entry. +* @param data The data for the request. +* @param data.key +* @param data.assetId +* @returns AssetStateResponse Successful Response +* @throws ApiError +*/ +export const useAssetStateServiceGetAssetState = = unknown[]>({ assetId, key }: { + assetId: number; + key: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }, queryKey), queryFn: () => AssetStateService.getAssetState({ assetId, key }) as TData, ...options }); +/** +* List Task States +* List all task state entries for a task instance. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.mapIndex +* @param data.limit +* @param data.offset +* @returns TaskStateCollectionResponse Successful Response +* @throws ApiError +*/ +export const useTaskStateServiceListTaskStates = = unknown[]>({ dagId, dagRunId, limit, mapIndex, offset, taskId }: { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number; + offset?: number; + taskId: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId }, queryKey), queryFn: () => TaskStateService.listTaskStates({ dagId, dagRunId, limit, mapIndex, offset, taskId }) as TData, ...options }); +/** +* Get Task State +* Get a single task state entry. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.key +* @param data.mapIndex +* @returns TaskStateResponse Successful Response +* @throws ApiError +*/ +export const useTaskStateServiceGetTaskState = = unknown[]>({ dagId, dagRunId, key, mapIndex, taskId }: { + dagId: string; + dagRunId: string; + key: string; + mapIndex?: number; + taskId: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex, taskId }, queryKey), queryFn: () => TaskStateService.getTaskState({ dagId, dagRunId, key, mapIndex, taskId }) as TData, ...options }); +/** * Get Xcom Entry * Get an XCom entry. * @param data The data for the request. @@ -2288,6 +2356,53 @@ export const useBackfillServiceCancelBackfill = ({ mutationFn: ({ backfillId }) => BackfillService.cancelBackfill({ backfillId }) as unknown as Promise, ...options }); /** +* Set Asset State +* Set an asset state value. Creates or overwrites the key. +* @param data The data for the request. +* @param data.key +* @param data.assetId +* @param data.requestBody +* @returns void Successful Response +* @throws ApiError +*/ +export const useAssetStateServiceSetAssetState = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ assetId, key, requestBody }) => AssetStateService.setAssetState({ assetId, key, requestBody }) as unknown as Promise, ...options }); +/** +* Set Task State +* Set a task state value. Creates or overwrites the key. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.key +* @param data.requestBody +* @param data.mapIndex +* @returns void Successful Response +* @throws ApiError +*/ +export const useTaskStateServiceSetTaskState = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, dagRunId, key, mapIndex, requestBody, taskId }) => TaskStateService.setTaskState({ dagId, dagRunId, key, mapIndex, requestBody, taskId }) as unknown as Promise, ...options }); +/** * Reparse Dag File * Request re-parsing a Dag file. * @param data The data for the request. @@ -2842,6 +2957,88 @@ export const usePoolServiceDeletePool = ({ mutationFn: ({ poolName }) => PoolService.deletePool({ poolName }) as unknown as Promise, ...options }); /** +* Clear Asset State +* Delete all state keys for an asset. +* @param data The data for the request. +* @param data.assetId +* @returns void Successful Response +* @throws ApiError +*/ +export const useAssetStateServiceClearAssetState = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ assetId }) => AssetStateService.clearAssetState({ assetId }) as unknown as Promise, ...options }); +/** +* Delete Asset State +* Delete a single asset state key. No-op if the key does not exist. +* @param data The data for the request. +* @param data.key +* @param data.assetId +* @returns void Successful Response +* @throws ApiError +*/ +export const useAssetStateServiceDeleteAssetState = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ assetId, key }) => AssetStateService.deleteAssetState({ assetId, key }) as unknown as Promise, ...options }); +/** +* Clear Task State +* Delete all task state keys for a task instance. +* +* When ``all_map_indices=true``, state is cleared for every map index of the task and +* the ``map_index`` parameter is ignored. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.mapIndex +* @param data.allMapIndices +* @returns void Successful Response +* @throws ApiError +*/ +export const useTaskStateServiceClearTaskState = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ allMapIndices, dagId, dagRunId, mapIndex, taskId }) => TaskStateService.clearTaskState({ allMapIndices, dagId, dagRunId, mapIndex, taskId }) as unknown as Promise, ...options }); +/** +* Delete Task State +* Delete a single task state key. No-op if the key does not exist. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.key +* @param data.mapIndex +* @returns void Successful Response +* @throws ApiError +*/ +export const useTaskStateServiceDeleteTaskState = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, dagRunId, key, mapIndex, taskId }) => TaskStateService.deleteTaskState({ dagId, dagRunId, key, mapIndex, taskId }) as unknown as Promise, ...options }); +/** * Delete Xcom Entry * Delete an XCom entry. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 9fdc07928707f..941bc25713fa8 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AssetStateService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TaskStateService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -1464,6 +1464,74 @@ export const useProviderServiceGetProvidersSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }, queryKey), queryFn: () => ProviderService.getProviders({ limit, offset }) as TData, ...options }); /** +* List Asset States +* List all state entries for an asset. +* @param data The data for the request. +* @param data.assetId +* @param data.limit +* @param data.offset +* @returns AssetStateCollectionResponse Successful Response +* @throws ApiError +*/ +export const useAssetStateServiceListAssetStatesSuspense = = unknown[]>({ assetId, limit, offset }: { + assetId: number; + limit?: number; + offset?: number; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset }, queryKey), queryFn: () => AssetStateService.listAssetStates({ assetId, limit, offset }) as TData, ...options }); +/** +* Get Asset State +* Get a single asset state entry. +* @param data The data for the request. +* @param data.key +* @param data.assetId +* @returns AssetStateResponse Successful Response +* @throws ApiError +*/ +export const useAssetStateServiceGetAssetStateSuspense = = unknown[]>({ assetId, key }: { + assetId: number; + key: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }, queryKey), queryFn: () => AssetStateService.getAssetState({ assetId, key }) as TData, ...options }); +/** +* List Task States +* List all task state entries for a task instance. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.mapIndex +* @param data.limit +* @param data.offset +* @returns TaskStateCollectionResponse Successful Response +* @throws ApiError +*/ +export const useTaskStateServiceListTaskStatesSuspense = = unknown[]>({ dagId, dagRunId, limit, mapIndex, offset, taskId }: { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number; + offset?: number; + taskId: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId }, queryKey), queryFn: () => TaskStateService.listTaskStates({ dagId, dagRunId, limit, mapIndex, offset, taskId }) as TData, ...options }); +/** +* Get Task State +* Get a single task state entry. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.taskId +* @param data.key +* @param data.mapIndex +* @returns TaskStateResponse Successful Response +* @throws ApiError +*/ +export const useTaskStateServiceGetTaskStateSuspense = = unknown[]>({ dagId, dagRunId, key, mapIndex, taskId }: { + dagId: string; + dagRunId: string; + key: string; + mapIndex?: number; + taskId: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex, taskId }, queryKey), queryFn: () => TaskStateService.getTaskState({ dagId, dagRunId, key, mapIndex, taskId }) as TData, ...options }); +/** * Get Xcom Entry * Get an XCom entry. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 99b1c49345548..9fc0f4c7fb5af 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -381,6 +381,63 @@ export const $AssetResponse = { description: 'Asset serializer for responses.' } as const; +export const $AssetStateBody = { + properties: { + value: { + type: 'string', + maxLength: 65535, + title: 'Value' + } + }, + additionalProperties: false, + type: 'object', + required: ['value'], + title: 'AssetStateBody', + description: 'Request body for setting an asset state value.' +} as const; + +export const $AssetStateCollectionResponse = { + properties: { + asset_states: { + items: { + '$ref': '#/components/schemas/AssetStateResponse' + }, + type: 'array', + title: 'Asset States' + }, + total_entries: { + type: 'integer', + title: 'Total Entries' + } + }, + type: 'object', + required: ['asset_states', 'total_entries'], + title: 'AssetStateCollectionResponse', + description: 'All asset state entries for an asset.' +} as const; + +export const $AssetStateResponse = { + properties: { + key: { + type: 'string', + title: 'Key' + }, + value: { + type: 'string', + title: 'Value' + }, + updated_at: { + type: 'string', + format: 'date-time', + title: 'Updated At' + } + }, + type: 'object', + required: ['key', 'value', 'updated_at'], + title: 'AssetStateResponse', + description: 'A single asset state key/value pair with metadata.' +} as const; + export const $AssetWatcherResponse = { properties: { name: { @@ -6738,6 +6795,75 @@ export const $TaskResponse = { description: 'Task serializer for responses.' } as const; +export const $TaskStateBody = { + properties: { + value: { + type: 'string', + maxLength: 65535, + title: 'Value' + } + }, + additionalProperties: false, + type: 'object', + required: ['value'], + title: 'TaskStateBody', + description: 'Request body for setting a task state value.' +} as const; + +export const $TaskStateCollectionResponse = { + properties: { + task_states: { + items: { + '$ref': '#/components/schemas/TaskStateResponse' + }, + type: 'array', + title: 'Task States' + }, + total_entries: { + type: 'integer', + title: 'Total Entries' + } + }, + type: 'object', + required: ['task_states', 'total_entries'], + title: 'TaskStateCollectionResponse', + description: 'All task state entries for a task instance.' +} as const; + +export const $TaskStateResponse = { + properties: { + key: { + type: 'string', + title: 'Key' + }, + value: { + type: 'string', + title: 'Value' + }, + updated_at: { + type: 'string', + format: 'date-time', + title: 'Updated At' + }, + expires_at: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Expires At' + } + }, + type: 'object', + required: ['key', 'value', 'updated_at', 'expires_at'], + title: 'TaskStateResponse', + description: 'A single task state key/value pair with metadata.' +} as const; + export const $TimeDelta = { properties: { __type: { diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index ec044ac30856d..4cea704a54b13 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStatesData, ListAssetStatesResponse, ClearAssetStateData, ClearAssetStateResponse, GetAssetStateData, GetAssetStateResponse, SetAssetStateData, SetAssetStateResponse, DeleteAssetStateData, DeleteAssetStateResponse, ListTaskStatesData, ListTaskStatesResponse, ClearTaskStateData, ClearTaskStateResponse, GetTaskStateData, GetTaskStateResponse, SetTaskStateData, SetTaskStateResponse, DeleteTaskStateData, DeleteTaskStateResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; export class AssetService { /** @@ -3493,6 +3493,325 @@ export class ProviderService { } +export class AssetStateService { + /** + * List Asset States + * List all state entries for an asset. + * @param data The data for the request. + * @param data.assetId + * @param data.limit + * @param data.offset + * @returns AssetStateCollectionResponse Successful Response + * @throws ApiError + */ + public static listAssetStates(data: ListAssetStatesData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/api/v2/assets/{asset_id}/states', + path: { + asset_id: data.assetId + }, + query: { + limit: data.limit, + offset: data.offset + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Clear Asset State + * Delete all state keys for an asset. + * @param data The data for the request. + * @param data.assetId + * @returns void Successful Response + * @throws ApiError + */ + public static clearAssetState(data: ClearAssetStateData): CancelablePromise { + return __request(OpenAPI, { + method: 'DELETE', + url: '/api/v2/assets/{asset_id}/states', + path: { + asset_id: data.assetId + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Get Asset State + * Get a single asset state entry. + * @param data The data for the request. + * @param data.key + * @param data.assetId + * @returns AssetStateResponse Successful Response + * @throws ApiError + */ + public static getAssetState(data: GetAssetStateData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/api/v2/assets/{asset_id}/states/{key}', + path: { + key: data.key, + asset_id: data.assetId + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Set Asset State + * Set an asset state value. Creates or overwrites the key. + * @param data The data for the request. + * @param data.key + * @param data.assetId + * @param data.requestBody + * @returns void Successful Response + * @throws ApiError + */ + public static setAssetState(data: SetAssetStateData): CancelablePromise { + return __request(OpenAPI, { + method: 'PUT', + url: '/api/v2/assets/{asset_id}/states/{key}', + path: { + key: data.key, + asset_id: data.assetId + }, + body: data.requestBody, + mediaType: 'application/json', + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Delete Asset State + * Delete a single asset state key. No-op if the key does not exist. + * @param data The data for the request. + * @param data.key + * @param data.assetId + * @returns void Successful Response + * @throws ApiError + */ + public static deleteAssetState(data: DeleteAssetStateData): CancelablePromise { + return __request(OpenAPI, { + method: 'DELETE', + url: '/api/v2/assets/{asset_id}/states/{key}', + path: { + key: data.key, + asset_id: data.assetId + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + +} + +export class TaskStateService { + /** + * List Task States + * List all task state entries for a task instance. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @param data.limit + * @param data.offset + * @returns TaskStateCollectionResponse Successful Response + * @throws ApiError + */ + public static listTaskStates(data: ListTaskStatesData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId + }, + query: { + map_index: data.mapIndex, + limit: data.limit, + offset: data.offset + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Clear Task State + * Delete all task state keys for a task instance. + * + * When ``all_map_indices=true``, state is cleared for every map index of the task and + * the ``map_index`` parameter is ignored. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.mapIndex + * @param data.allMapIndices + * @returns void Successful Response + * @throws ApiError + */ + public static clearTaskState(data: ClearTaskStateData): CancelablePromise { + return __request(OpenAPI, { + method: 'DELETE', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId + }, + query: { + map_index: data.mapIndex, + all_map_indices: data.allMapIndices + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Get Task State + * Get a single task state entry. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.key + * @param data.mapIndex + * @returns TaskStateResponse Successful Response + * @throws ApiError + */ + public static getTaskState(data: GetTaskStateData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + key: data.key + }, + query: { + map_index: data.mapIndex + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Set Task State + * Set a task state value. Creates or overwrites the key. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.key + * @param data.requestBody + * @param data.mapIndex + * @returns void Successful Response + * @throws ApiError + */ + public static setTaskState(data: SetTaskStateData): CancelablePromise { + return __request(OpenAPI, { + method: 'PUT', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + key: data.key + }, + query: { + map_index: data.mapIndex + }, + body: data.requestBody, + mediaType: 'application/json', + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Delete Task State + * Delete a single task state key. No-op if the key does not exist. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.key + * @param data.mapIndex + * @returns void Successful Response + * @throws ApiError + */ + public static deleteTaskState(data: DeleteTaskStateData): CancelablePromise { + return __request(OpenAPI, { + method: 'DELETE', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + key: data.key + }, + query: { + map_index: data.mapIndex + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + +} + export class XcomService { /** * Get Xcom Entry diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 4d401bfc1a496..c40f0db1f5ae2 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -96,6 +96,30 @@ export type AssetResponse = { last_asset_event?: LastAssetEventResponse | null; }; +/** + * Request body for setting an asset state value. + */ +export type AssetStateBody = { + value: string; +}; + +/** + * All asset state entries for an asset. + */ +export type AssetStateCollectionResponse = { + asset_states: Array; + total_entries: number; +}; + +/** + * A single asset state key/value pair with metadata. + */ +export type AssetStateResponse = { + key: string; + value: string; + updated_at: string; +}; + /** * Asset watcher serializer for responses. */ @@ -1631,6 +1655,31 @@ export type TaskResponse = { readonly extra_links: Array<(string)>; }; +/** + * Request body for setting a task state value. + */ +export type TaskStateBody = { + value: string; +}; + +/** + * All task state entries for a task instance. + */ +export type TaskStateCollectionResponse = { + task_states: Array; + total_entries: number; +}; + +/** + * A single task state key/value pair with metadata. + */ +export type TaskStateResponse = { + key: string; + value: string; + updated_at: string; + expires_at: string | null; +}; + /** * TimeDelta can be used to interact with datetime.timedelta objects. */ @@ -3755,6 +3804,94 @@ export type GetProvidersData = { export type GetProvidersResponse = ProviderCollectionResponse; +export type ListAssetStatesData = { + assetId: number; + limit?: number; + offset?: number; +}; + +export type ListAssetStatesResponse = AssetStateCollectionResponse; + +export type ClearAssetStateData = { + assetId: number; +}; + +export type ClearAssetStateResponse = void; + +export type GetAssetStateData = { + assetId: number; + key: string; +}; + +export type GetAssetStateResponse = AssetStateResponse; + +export type SetAssetStateData = { + assetId: number; + key: string; + requestBody: AssetStateBody; +}; + +export type SetAssetStateResponse = void; + +export type DeleteAssetStateData = { + assetId: number; + key: string; +}; + +export type DeleteAssetStateResponse = void; + +export type ListTaskStatesData = { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number; + offset?: number; + taskId: string; +}; + +export type ListTaskStatesResponse = TaskStateCollectionResponse; + +export type ClearTaskStateData = { + allMapIndices?: boolean; + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; +}; + +export type ClearTaskStateResponse = void; + +export type GetTaskStateData = { + dagId: string; + dagRunId: string; + key: string; + mapIndex?: number; + taskId: string; +}; + +export type GetTaskStateResponse = TaskStateResponse; + +export type SetTaskStateData = { + dagId: string; + dagRunId: string; + key: string; + mapIndex?: number; + requestBody: TaskStateBody; + taskId: string; +}; + +export type SetTaskStateResponse = void; + +export type DeleteTaskStateData = { + dagId: string; + dagRunId: string; + key: string; + mapIndex?: number; + taskId: string; +}; + +export type DeleteTaskStateResponse = void; + export type GetXcomEntryData = { dagId: string; dagRunId: string; @@ -6790,6 +6927,264 @@ export type $OpenApiTs = { }; }; }; + '/api/v2/assets/{asset_id}/states': { + get: { + req: ListAssetStatesData; + res: { + /** + * Successful Response + */ + 200: AssetStateCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + delete: { + req: ClearAssetStateData; + res: { + /** + * Successful Response + */ + 204: void; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + '/api/v2/assets/{asset_id}/states/{key}': { + get: { + req: GetAssetStateData; + res: { + /** + * Successful Response + */ + 200: AssetStateResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + put: { + req: SetAssetStateData; + res: { + /** + * Successful Response + */ + 204: void; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + delete: { + req: DeleteAssetStateData; + res: { + /** + * Successful Response + */ + 204: void; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states': { + get: { + req: ListTaskStatesData; + res: { + /** + * Successful Response + */ + 200: TaskStateCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + delete: { + req: ClearTaskStateData; + res: { + /** + * Successful Response + */ + 204: void; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}': { + get: { + req: GetTaskStateData; + res: { + /** + * Successful Response + */ + 200: TaskStateResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + put: { + req: SetTaskStateData; + res: { + /** + * Successful Response + */ + 204: void; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + delete: { + req: DeleteTaskStateData; + res: { + /** + * Successful Response + */ + 204: void; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}': { get: { req: GetXcomEntryData; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py new file mode 100644 index 0000000000000..c53fbb99ee91b --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py @@ -0,0 +1,255 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.models.asset import AssetModel +from airflow.models.asset_state import AssetStateModel + +from tests_common.test_utils.db import clear_db_assets + +pytestmark = pytest.mark.db_test + +ASSET_URI = "s3://bucket/watermarks" +ASSET_NAME = "test_asset" + + +def _create_asset(session) -> AssetModel: + asset = AssetModel(uri=ASSET_URI, name=ASSET_NAME, group="test") + session.add(asset) + session.flush() + return asset + + +def _create_asset_state(session, asset_id: int, key: str, value: str) -> None: + row = AssetStateModel(asset_id=asset_id, key=key, value=value) + session.add(row) + session.flush() + + +class TestAssetStateEndpoint: + @staticmethod + def clear_db(): + clear_db_assets() + + @pytest.fixture(autouse=True) + def setup(self, session): + self.clear_db() + self.asset = _create_asset(session) + session.commit() + self._session = session + self._base_url = f"/assets/{self.asset.id}/states" + + def teardown_method(self): + self.clear_db() + + +class TestAssetNotFound(TestAssetStateEndpoint): + """All endpoints return 404 when asset_id does not exist.""" + + def test_list_unknown_asset_returns_404(self, test_client): + assert test_client.get("/assets/99999/states").status_code == 404 + + def test_get_unknown_asset_returns_404(self, test_client): + assert test_client.get("/assets/99999/states/key").status_code == 404 + + def test_set_unknown_asset_returns_404(self, test_client): + assert test_client.put("/assets/99999/states/key", json={"value": "v"}).status_code == 404 + + def test_delete_unknown_asset_returns_404(self, test_client): + assert test_client.delete("/assets/99999/states/key").status_code == 404 + + def test_clear_unknown_asset_returns_404(self, test_client): + assert test_client.delete("/assets/99999/states").status_code == 404 + + +class TestListAssetState(TestAssetStateEndpoint): + def test_returns_empty_list_when_no_state(self, test_client): + response = test_client.get(self._base_url) + assert response.status_code == 200 + assert response.json() == {"asset_states": [], "total_entries": 0} + + def test_returns_all_keys(self, test_client): + _create_asset_state(self._session, self.asset.id, "watermark", "2026-05-01") + _create_asset_state(self._session, self.asset.id, "file_count", "42") + self._session.commit() + + response = test_client.get(self._base_url) + assert response.status_code == 200 + data = response.json() + assert data["total_entries"] == 2 + keys = {item["key"]: item["value"] for item in data["asset_states"]} + assert keys == {"watermark": "2026-05-01", "file_count": "42"} + + def test_returns_metadata_fields(self, test_client): + _create_asset_state(self._session, self.asset.id, "watermark", "2026-05-01") + self._session.commit() + + item = test_client.get(self._base_url).json()["asset_states"][0] + assert "updated_at" in item + assert item["key"] == "watermark" + + def test_pagination_limit(self, test_client): + for k in ("watermark", "file_count", "last_run"): + _create_asset_state(self._session, self.asset.id, k, "v") + self._session.commit() + + response = test_client.get(f"{self._base_url}?limit=2") + data = response.json() + assert data["total_entries"] == 3 + assert len(data["asset_states"]) == 2 + + def test_pagination_offset(self, test_client): + for k in ("watermark", "file_count", "last_run"): + _create_asset_state(self._session, self.asset.id, k, "v") + self._session.commit() + + response = test_client.get(f"{self._base_url}?limit=2&offset=2") + data = response.json() + assert data["total_entries"] == 3 + assert len(data["asset_states"]) == 1 + + def test_unauthorized_returns_401(self, unauthenticated_test_client): + assert unauthenticated_test_client.get(self._base_url).status_code == 401 + + +class TestGetAssetState(TestAssetStateEndpoint): + def test_returns_value(self, test_client): + _create_asset_state(self._session, self.asset.id, "watermark", "2026-05-01") + self._session.commit() + + response = test_client.get(f"{self._base_url}/watermark") + assert response.status_code == 200 + data = response.json() + assert data["key"] == "watermark" + assert data["value"] == "2026-05-01" + assert "updated_at" in data + + def test_missing_key_returns_404(self, test_client): + assert test_client.get(f"{self._base_url}/nonexistent").status_code == 404 + + def test_key_with_slash_is_supported(self, test_client): + """Keys containing slashes must work — route uses {key:path}.""" + _create_asset_state(self._session, self.asset.id, "partition/date", "2026-05-01") + self._session.commit() + + response = test_client.get(f"{self._base_url}/partition/date") + assert response.status_code == 200 + assert response.json()["key"] == "partition/date" + + def test_unauthorized_returns_401(self, unauthenticated_test_client): + assert unauthenticated_test_client.get(f"{self._base_url}/watermark").status_code == 401 + + +class TestSetAssetState(TestAssetStateEndpoint): + def test_creates_new_key(self, test_client): + response = test_client.put(f"{self._base_url}/watermark", json={"value": "2026-05-01"}) + assert response.status_code == 204 + + assert test_client.get(f"{self._base_url}/watermark").json()["value"] == "2026-05-01" + + def test_overwrites_existing_key(self, test_client): + test_client.put(f"{self._base_url}/watermark", json={"value": "v1"}) + test_client.put(f"{self._base_url}/watermark", json={"value": "v2"}) + + assert test_client.get(f"{self._base_url}/watermark").json()["value"] == "v2" + + def test_empty_body_returns_422(self, test_client): + assert test_client.put(f"{self._base_url}/watermark", json={}).status_code == 422 + + def test_oversized_value_returns_422(self, test_client): + assert test_client.put(f"{self._base_url}/watermark", json={"value": "x" * 65536}).status_code == 422 + + def test_key_with_slash_is_supported(self, test_client): + response = test_client.put(f"{self._base_url}/partition/date", json={"value": "2026-05-01"}) + assert response.status_code == 204 + assert test_client.get(f"{self._base_url}/partition/date").json()["key"] == "partition/date" + + def test_unauthorized_returns_401(self, unauthenticated_test_client): + assert ( + unauthenticated_test_client.put(f"{self._base_url}/watermark", json={"value": "v"}).status_code + == 401 + ) + + +class TestDeleteAssetState(TestAssetStateEndpoint): + def test_deletes_key(self, test_client): + _create_asset_state(self._session, self.asset.id, "watermark", "2026-05-01") + self._session.commit() + + assert test_client.delete(f"{self._base_url}/watermark").status_code == 204 + assert test_client.get(f"{self._base_url}/watermark").status_code == 404 + + def test_delete_noop_for_missing_key(self, test_client): + assert test_client.delete(f"{self._base_url}/nonexistent").status_code == 204 + + def test_only_deletes_target_key(self, test_client): + _create_asset_state(self._session, self.asset.id, "watermark", "a") + _create_asset_state(self._session, self.asset.id, "file_count", "b") + self._session.commit() + + test_client.delete(f"{self._base_url}/watermark") + + assert test_client.get(f"{self._base_url}/watermark").status_code == 404 + assert test_client.get(f"{self._base_url}/file_count").json()["value"] == "b" + + def test_key_with_slash_is_supported(self, test_client): + _create_asset_state(self._session, self.asset.id, "partition/date", "v") + self._session.commit() + + assert test_client.delete(f"{self._base_url}/partition/date").status_code == 204 + assert test_client.get(f"{self._base_url}/partition/date").status_code == 404 + + def test_unauthorized_returns_401(self, unauthenticated_test_client): + assert unauthenticated_test_client.delete(f"{self._base_url}/watermark").status_code == 401 + + +class TestClearAssetState(TestAssetStateEndpoint): + def test_clears_all_keys(self, test_client): + for k, v in [("watermark", "a"), ("file_count", "b"), ("last_run", "c")]: + _create_asset_state(self._session, self.asset.id, k, v) + self._session.commit() + + assert test_client.delete(self._base_url).status_code == 204 + assert test_client.get(self._base_url).json()["total_entries"] == 0 + + def test_clear_is_noop_when_no_state(self, test_client): + assert test_client.delete(self._base_url).status_code == 204 + + def test_clear_does_not_affect_other_assets(self, test_client): + other_asset = AssetModel(uri="s3://other/asset", name="other_asset", group="test") + self._session.add(other_asset) + self._session.flush() + _create_asset_state(self._session, self.asset.id, "watermark", "mine") + _create_asset_state(self._session, other_asset.id, "watermark", "theirs") + self._session.commit() + + test_client.delete(self._base_url) + + other_url = f"/assets/{other_asset.id}/states" + assert test_client.get(f"{other_url}/watermark").json()["value"] == "theirs" + + def test_clears_slash_keyed_entries(self, test_client): + _create_asset_state(self._session, self.asset.id, "partition/date", "v") + self._session.commit() + + assert test_client.delete(self._base_url).status_code == 204 + assert test_client.get(self._base_url).json()["total_entries"] == 0 + + def test_unauthorized_returns_401(self, unauthenticated_test_client): + assert unauthenticated_test_client.delete(self._base_url).status_code == 401 diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py new file mode 100644 index 0000000000000..c53212fa5e444 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py @@ -0,0 +1,293 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest +from sqlalchemy import select + +from airflow._shared.timezones import timezone +from airflow.models.dagrun import DagRun +from airflow.models.task_state import TaskStateModel +from airflow.providers.standard.operators.empty import EmptyOperator +from airflow.utils.types import DagRunType + +from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags, clear_db_runs + +pytestmark = pytest.mark.db_test + +DAG_ID = "test_dag" +TASK_ID = "test_task" +LOGICAL_DATE = timezone.datetime(2026, 1, 1) +RUN_ID = DagRun.generate_run_id(run_type=DagRunType.MANUAL, logical_date=LOGICAL_DATE, run_after=LOGICAL_DATE) + +BASE_URL = f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/states" + + +def _create_dag_run(dag_maker, session): + with dag_maker(DAG_ID, schedule=None, start_date=LOGICAL_DATE): + EmptyOperator(task_id=TASK_ID) + dag_maker.create_dagrun(run_id=RUN_ID, run_type=DagRunType.MANUAL, logical_date=LOGICAL_DATE) + dag_maker.sync_dagbag_to_db() + session.merge(dag_maker.dag_model) + session.commit() + + +def _create_task_state(session, key: str, value: str, dag_run: DagRun) -> None: + row = TaskStateModel( + dag_run_id=dag_run.id, + dag_id=DAG_ID, + run_id=RUN_ID, + task_id=TASK_ID, + map_index=-1, + key=key, + value=value, + ) + session.add(row) + session.flush() + + +class TestTaskStateEndpoint: + @staticmethod + def clear_db(): + clear_db_dags() + clear_db_runs() + clear_db_dag_bundles() + + @pytest.fixture(autouse=True) + def setup(self, dag_maker, session): + self.clear_db() + _create_dag_run(dag_maker, session) + self.dag_run = session.scalar(select(DagRun).where(DagRun.run_id == RUN_ID)) + self._session = session + + def teardown_method(self): + self.clear_db() + + +class TestListTaskState(TestTaskStateEndpoint): + def test_returns_empty_list_when_no_state(self, test_client): + response = test_client.get(BASE_URL) + assert response.status_code == 200 + assert response.json() == {"task_states": [], "total_entries": 0} + + def test_returns_all_keys(self, test_client): + _create_task_state(self._session, "job_id", "spark_001", self.dag_run) + _create_task_state(self._session, "checkpoint", "step_3", self.dag_run) + self._session.commit() + + response = test_client.get(BASE_URL) + assert response.status_code == 200 + data = response.json() + assert data["total_entries"] == 2 + keys = {item["key"]: item["value"] for item in data["task_states"]} + assert keys == {"job_id": "spark_001", "checkpoint": "step_3"} + + def test_returns_state_metadata_fields(self, test_client): + _create_task_state(self._session, "job_id", "spark_001", self.dag_run) + self._session.commit() + + response = test_client.get(BASE_URL) + item = response.json()["task_states"][0] + assert "updated_at" in item + assert "expires_at" in item + + def test_map_index_isolation(self, test_client): + """map_index=-1 (default) doesn't return rows for other map indices.""" + row = TaskStateModel( + dag_run_id=self.dag_run.id, + dag_id=DAG_ID, + run_id=RUN_ID, + task_id=TASK_ID, + map_index=0, + key="job_id", + value="mapped_app", + ) + self._session.add(row) + self._session.commit() + + response = test_client.get(BASE_URL) + assert response.json()["total_entries"] == 0 + + def test_pagination_limit(self, test_client): + for k in ("a", "b", "c"): + _create_task_state(self._session, k, "v", self.dag_run) + self._session.commit() + + response = test_client.get(f"{BASE_URL}?limit=2") + data = response.json() + assert data["total_entries"] == 3 + assert len(data["task_states"]) == 2 + + def test_pagination_offset(self, test_client): + for k in ("a", "b", "c"): + _create_task_state(self._session, k, "v", self.dag_run) + self._session.commit() + + response = test_client.get(f"{BASE_URL}?limit=2&offset=2") + data = response.json() + assert data["total_entries"] == 3 + assert len(data["task_states"]) == 1 + + def test_unauthorized_returns_401(self, unauthenticated_test_client): + assert unauthenticated_test_client.get(BASE_URL).status_code == 401 + + +class TestGetTaskState(TestTaskStateEndpoint): + def test_returns_value(self, test_client): + _create_task_state(self._session, "job_id", "spark_001", self.dag_run) + self._session.commit() + + response = test_client.get(f"{BASE_URL}/job_id") + assert response.status_code == 200 + data = response.json() + assert data["key"] == "job_id" + assert data["value"] == "spark_001" + + def test_missing_key_returns_404(self, test_client): + response = test_client.get(f"{BASE_URL}/nonexistent") + assert response.status_code == 404 + + def test_key_with_slash_is_supported(self, test_client): + """Keys containing slashes must work — route uses {key:path}.""" + _create_task_state(self._session, "workflow/step_1", "v", self.dag_run) + self._session.commit() + + response = test_client.get(f"{BASE_URL}/workflow/step_1") + assert response.status_code == 200 + assert response.json()["key"] == "workflow/step_1" + + def test_unauthorized_returns_401(self, unauthenticated_test_client): + assert unauthenticated_test_client.get(f"{BASE_URL}/job_id").status_code == 401 + + +class TestSetTaskState(TestTaskStateEndpoint): + def test_creates_new_key(self, test_client): + response = test_client.put(f"{BASE_URL}/job_id", json={"value": "spark_001"}) + assert response.status_code == 204 + + get_resp = test_client.get(f"{BASE_URL}/job_id") + assert get_resp.json()["value"] == "spark_001" + + def test_overwrites_existing_key(self, test_client): + test_client.put(f"{BASE_URL}/job_id", json={"value": "v1"}) + test_client.put(f"{BASE_URL}/job_id", json={"value": "v2"}) + + assert test_client.get(f"{BASE_URL}/job_id").json()["value"] == "v2" + + def test_empty_body_returns_422(self, test_client): + assert test_client.put(f"{BASE_URL}/job_id", json={}).status_code == 422 + + def test_oversized_value_returns_422(self, test_client): + assert test_client.put(f"{BASE_URL}/job_id", json={"value": "x" * 65536}).status_code == 422 + + def test_set_nonexistent_dag_run_returns_404(self, test_client): + """set() raises ValueError when DagRun doesn't exist — should surface as 404.""" + bad_url = f"/dags/{DAG_ID}/dagRuns/nonexistent_run/taskInstances/{TASK_ID}/states/job_id" + response = test_client.put(bad_url, json={"value": "v"}) + assert response.status_code == 404 + + def test_set_nonexistent_task_id_returns_404(self, test_client): + """set() returns 404 when task_id doesn not match any TaskInstance in the run.""" + bad_url = f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/nonexistent_task/states/job_id" + response = test_client.put(bad_url, json={"value": "v"}) + assert response.status_code == 404 + + def test_key_with_slash_is_supported(self, test_client): + response = test_client.put(f"{BASE_URL}/workflow/step_1", json={"value": "v"}) + assert response.status_code == 204 + assert test_client.get(f"{BASE_URL}/workflow/step_1").json()["key"] == "workflow/step_1" + + def test_unauthorized_returns_401(self, unauthenticated_test_client): + assert unauthenticated_test_client.put(f"{BASE_URL}/job_id", json={"value": "v"}).status_code == 401 + + +class TestDeleteTaskState(TestTaskStateEndpoint): + def test_deletes_key(self, test_client): + _create_task_state(self._session, "job_id", "spark_001", self.dag_run) + self._session.commit() + + assert test_client.delete(f"{BASE_URL}/job_id").status_code == 204 + assert test_client.get(f"{BASE_URL}/job_id").status_code == 404 + + def test_delete_noop_for_missing_key(self, test_client): + assert test_client.delete(f"{BASE_URL}/nonexistent").status_code == 204 + + def test_only_deletes_target_key(self, test_client): + _create_task_state(self._session, "job_id", "a", self.dag_run) + _create_task_state(self._session, "checkpoint", "b", self.dag_run) + self._session.commit() + + test_client.delete(f"{BASE_URL}/job_id") + + assert test_client.get(f"{BASE_URL}/job_id").status_code == 404 + assert test_client.get(f"{BASE_URL}/checkpoint").json()["value"] == "b" + + def test_key_with_slash_is_supported(self, test_client): + _create_task_state(self._session, "workflow/step_1", "v", self.dag_run) + self._session.commit() + + assert test_client.delete(f"{BASE_URL}/workflow/step_1").status_code == 204 + assert test_client.get(f"{BASE_URL}/workflow/step_1").status_code == 404 + + def test_unauthorized_returns_401(self, unauthenticated_test_client): + assert unauthenticated_test_client.delete(f"{BASE_URL}/job_id").status_code == 401 + + +class TestClearTaskState(TestTaskStateEndpoint): + def test_clears_all_keys(self, test_client): + for k, v in [("job_id", "a"), ("checkpoint", "b"), ("retry_count", "c")]: + _create_task_state(self._session, k, v, self.dag_run) + self._session.commit() + + assert test_client.delete(BASE_URL).status_code == 204 + assert test_client.get(BASE_URL).json()["total_entries"] == 0 + + def test_all_map_indices_clears_across_mapped_instances(self, test_client): + """all_map_indices=true wipes state for every map index of the task.""" + for map_index in (-1, 0, 1): + row = TaskStateModel( + dag_run_id=self.dag_run.id, + dag_id=DAG_ID, + run_id=RUN_ID, + task_id=TASK_ID, + map_index=map_index, + key="job_id", + value=f"app_{map_index}", + ) + self._session.add(row) + self._session.commit() + + # Default clear only wipes map_index=-1 + assert test_client.delete(BASE_URL).status_code == 204 + # map_index=0 and map_index=1 rows still exist + assert test_client.get(f"{BASE_URL}?map_index=0").json()["total_entries"] == 1 + assert test_client.get(f"{BASE_URL}?map_index=1").json()["total_entries"] == 1 + + # all_map_indices=true wipes everything + assert test_client.delete(f"{BASE_URL}?all_map_indices=true").status_code == 204 + assert test_client.get(f"{BASE_URL}?map_index=0").json()["total_entries"] == 0 + assert test_client.get(f"{BASE_URL}?map_index=1").json()["total_entries"] == 0 + + def test_key_with_slash_is_supported(self, test_client): + _create_task_state(self._session, "workflow/step_1", "v", self.dag_run) + self._session.commit() + + assert test_client.delete(BASE_URL).status_code == 204 + assert test_client.get(BASE_URL).json()["total_entries"] == 0 + + def test_unauthorized_returns_401(self, unauthenticated_test_client): + assert unauthenticated_test_client.delete(BASE_URL).status_code == 401 diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 276c8699de058..6886bdd3aa704 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -49,6 +49,27 @@ class AssetAliasResponse(BaseModel): group: Annotated[str, Field(title="Group")] +class AssetStateBody(BaseModel): + """ + Request body for setting an asset state value. + """ + + model_config = ConfigDict( + extra="forbid", + ) + value: Annotated[str, Field(max_length=65535, title="Value")] + + +class AssetStateResponse(BaseModel): + """ + A single asset state key/value pair with metadata. + """ + + key: Annotated[str, Field(title="Key")] + value: Annotated[str, Field(title="Value")] + updated_at: Annotated[datetime, Field(title="Updated At")] + + class AssetWatcherResponse(BaseModel): """ Asset watcher serializer for responses. @@ -906,6 +927,28 @@ class TaskOutletAssetReference(BaseModel): updated_at: Annotated[datetime, Field(title="Updated At")] +class TaskStateBody(BaseModel): + """ + Request body for setting a task state value. + """ + + model_config = ConfigDict( + extra="forbid", + ) + value: Annotated[str, Field(max_length=65535, title="Value")] + + +class TaskStateResponse(BaseModel): + """ + A single task state key/value pair with metadata. + """ + + key: Annotated[str, Field(title="Key")] + value: Annotated[str, Field(title="Value")] + updated_at: Annotated[datetime, Field(title="Updated At")] + expires_at: Annotated[datetime | None, Field(title="Expires At")] = None + + class TimeDelta(BaseModel): """ TimeDelta can be used to interact with datetime.timedelta objects. @@ -1136,6 +1179,15 @@ class AssetResponse(BaseModel): last_asset_event: LastAssetEventResponse | None = None +class AssetStateCollectionResponse(BaseModel): + """ + All asset state entries for an asset. + """ + + asset_states: Annotated[list[AssetStateResponse], Field(title="Asset States")] + total_entries: Annotated[int, Field(title="Total Entries")] + + class BackfillPostBody(BaseModel): """ Object used for create backfill request. @@ -1854,6 +1906,15 @@ class TaskResponse(BaseModel): ] +class TaskStateCollectionResponse(BaseModel): + """ + All task state entries for a task instance. + """ + + task_states: Annotated[list[TaskStateResponse], Field(title="Task States")] + total_entries: Annotated[int, Field(title="Total Entries")] + + class VariableCollectionResponse(BaseModel): """ Variable Collection serializer for responses.