From 701ca4ff4159df00964f377f9b7352c2fb85f2fb Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 29 Apr 2026 12:37:56 +0530 Subject: [PATCH 1/7] AIP-103: Add Execution API endpoints for task and asset states --- .../execution_api/datamodels/asset_state.py | 32 +++ .../execution_api/datamodels/task_state.py | 32 +++ .../execution_api/routes/__init__.py | 4 + .../execution_api/routes/asset_state.py | 104 +++++++++ .../execution_api/routes/task_state.py | 124 ++++++++++ airflow-core/src/airflow/state/__init__.py | 11 + airflow-core/src/airflow/state/metastore.py | 1 + .../versions/head/test_asset_state.py | 136 +++++++++++ .../versions/head/test_task_state.py | 214 ++++++++++++++++++ .../airflow/sdk/api/datamodels/_generated.py | 44 ++++ 10 files changed, 702 insertions(+) create mode 100644 airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_state.py create mode 100644 airflow-core/src/airflow/api_fastapi/execution_api/datamodels/task_state.py create mode 100644 airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py create mode 100644 airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py create mode 100644 airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py create mode 100644 airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_state.py new file mode 100644 index 0000000000000..ec773201c7e2f --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_state.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from airflow.api_fastapi.core_api.base import StrictBaseModel + + +class AssetStateResponse(StrictBaseModel): + """Asset state value returned to a worker.""" + + value: str + + +class AssetStatePutBody(StrictBaseModel): + """Request body for setting an asset state value.""" + + value: str diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/task_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/task_state.py new file mode 100644 index 0000000000000..3200f3177af35 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/task_state.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from airflow.api_fastapi.core_api.base import StrictBaseModel + + +class TaskStateResponse(StrictBaseModel): + """Task state value returned to a worker.""" + + value: str + + +class TaskStatePutBody(StrictBaseModel): + """Request body for setting a task state value.""" + + value: str diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py index a076592d6471a..06f07aee82389 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py @@ -21,6 +21,7 @@ from airflow.api_fastapi.execution_api.routes import ( asset_events, + asset_state, assets, connections, dag_runs, @@ -29,6 +30,7 @@ hitl, task_instances, task_reschedules, + task_state, variables, xcoms, ) @@ -52,5 +54,7 @@ authenticated_router.include_router(variables.router, prefix="/variables", tags=["Variables"]) authenticated_router.include_router(xcoms.router, prefix="/xcoms", tags=["XComs"]) authenticated_router.include_router(hitl.router, prefix="/hitlDetails", tags=["Human in the Loop"]) +authenticated_router.include_router(task_state.router, prefix="/state/ti", tags=["Task State"]) +authenticated_router.include_router(asset_state.router, prefix="/state/asset", tags=["Asset State"]) execution_api_router.include_router(authenticated_router) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py new file mode 100644 index 0000000000000..fe7137574ca9d --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Execution API routes for asset state. + +Per-task asset registration checks (i.e. enforcing that the requesting task +references the asset as inlet or outlet) are intentionally not implemented +here. AIP-103 sketches a write/read asymmetry but explicitly defers the +precise authorization rules to AIP-93 as mentioned in AIP-103. +""" + +from __future__ import annotations + +from typing import Annotated + +from cadwyn import VersionedAPIRouter +from fastapi import HTTPException, Path, status + +from airflow._shared.state import AssetScope +from airflow.api_fastapi.common.db.common import SessionDep +from airflow.api_fastapi.execution_api.datamodels.asset_state import ( + AssetStatePutBody, + AssetStateResponse, +) +from airflow.models.asset import AssetModel +from airflow.state import get_state_backend + +# TODO(AIP-103): enforce that the requesting task is registered with the asset +# (via task_inlet_asset_reference or task_outlet_asset_reference) before +# allowing reads/writes. Currently any task with a valid execution token can +# access any asset's state — the same gap exists in /assets and /asset-events. +# Proper fix is a unified asset-registration check across all asset routes, +# not just here. +router = VersionedAPIRouter( + responses={ + status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"}, + status.HTTP_404_NOT_FOUND: {"description": "Not found"}, + }, +) + + +@router.get("/{asset_id}/{key}") +def get_asset_state( + asset_id: int, + key: Annotated[str, Path(min_length=1)], +) -> AssetStateResponse: + """Get an asset state.""" + value = get_state_backend().get(AssetScope(asset_id=asset_id), key) + if value is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "reason": "not_found", + "message": f"Asset state key {key!r} not found", + }, + ) + return AssetStateResponse(value=value) + + +@router.put("/{asset_id}/{key}", status_code=status.HTTP_204_NO_CONTENT) +def set_asset_state( + asset_id: int, + key: Annotated[str, Path(min_length=1)], + body: AssetStatePutBody, + session: SessionDep, +) -> None: + """Set an asset state.""" + if session.get(AssetModel, asset_id) is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"reason": "not_found", "message": f"Asset {asset_id} not found"}, + ) + get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value) + + +@router.delete("/{asset_id}/{key}", status_code=status.HTTP_204_NO_CONTENT) +def delete_asset_state( + asset_id: int, + key: Annotated[str, Path(min_length=1)], +) -> None: + """Delete an asset state.""" + get_state_backend().delete(AssetScope(asset_id=asset_id), key) + + +@router.delete("/{asset_id}", status_code=status.HTTP_204_NO_CONTENT) +def clear_asset_state( + asset_id: int, +) -> None: + """Delete all state keys for an asset.""" + get_state_backend().clear(AssetScope(asset_id=asset_id)) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py new file mode 100644 index 0000000000000..f8b49c8a3eb8d --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Annotated +from uuid import UUID + +from cadwyn import VersionedAPIRouter +from fastapi import HTTPException, Path, Security, status +from sqlalchemy.orm import Session + +from airflow._shared.state import TaskScope +from airflow.api_fastapi.common.db.common import SessionDep +from airflow.api_fastapi.execution_api.datamodels.task_state import ( + TaskStatePutBody, + TaskStateResponse, +) +from airflow.api_fastapi.execution_api.security import require_auth +from airflow.models.taskinstance import TaskInstance as TI +from airflow.state import get_state_backend + +router = VersionedAPIRouter( + responses={ + status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"}, + status.HTTP_403_FORBIDDEN: {"description": "Access denied"}, + status.HTTP_404_NOT_FOUND: {"description": "Not found"}, + }, + dependencies=[Security(require_auth, scopes=["ti:self"])], +) + + +def _get_task_scope_for_ti(task_instance_id: UUID, session: Session) -> TaskScope: + ti = session.get(TI, task_instance_id) + if ti is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "reason": "not_found", + "message": f"Task instance {task_instance_id} not found", + }, + ) + return TaskScope(dag_id=ti.dag_id, run_id=ti.run_id, task_id=ti.task_id, map_index=ti.map_index) + + +@router.get("/{task_instance_id}/{key}") +def get_task_state( + task_instance_id: UUID, + key: Annotated[str, Path(min_length=1)], + session: SessionDep, +) -> TaskStateResponse: + """Get value for a task state.""" + scope = _get_task_scope_for_ti(task_instance_id, session) + value = get_state_backend().get(scope, key) + if value is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "reason": "not_found", + "message": f"Task state key {key!r} not found", + }, + ) + return TaskStateResponse(value=value) + + +@router.put("/{task_instance_id}/{key}", status_code=status.HTTP_204_NO_CONTENT) +def set_task_state( + task_instance_id: UUID, + key: Annotated[str, Path(min_length=1)], + body: TaskStatePutBody, + session: SessionDep, +) -> None: + """Set a task state key, creating or updating the row.""" + scope = _get_task_scope_for_ti(task_instance_id, session) + get_state_backend().set(scope, key, body.value) + + +@router.delete("/{task_instance_id}/{key}", status_code=status.HTTP_204_NO_CONTENT) +def delete_task_state( + task_instance_id: UUID, + key: Annotated[str, Path(min_length=1)], + session: SessionDep, +) -> None: + """Delete a single task state key.""" + scope = _get_task_scope_for_ti(task_instance_id, session) + get_state_backend().delete(scope, key) + + +@router.delete("/{task_instance_id}", status_code=status.HTTP_204_NO_CONTENT) +def clear_task_state( + task_instance_id: UUID, + session: SessionDep, +) -> None: + """ + Delete all task state keys for this task. + + For mapped tasks this clears state across every map_index of the task; the + requesting worker's ``map_index`` is intentionally dropped from the scope. In short, + this wipe's off a task's state entirely. + """ + ti = session.get(TI, task_instance_id) + if ti is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "reason": "not_found", + "message": f"Task instance {task_instance_id} not found", + }, + ) + scope = TaskScope(dag_id=ti.dag_id, run_id=ti.run_id, task_id=ti.task_id) + get_state_backend().clear(scope) diff --git a/airflow-core/src/airflow/state/__init__.py b/airflow-core/src/airflow/state/__init__.py index 89fa0801a06f0..f054504a7c201 100644 --- a/airflow-core/src/airflow/state/__init__.py +++ b/airflow-core/src/airflow/state/__init__.py @@ -36,3 +36,14 @@ def resolve_state_backend() -> type[BaseStateBackend]: f"Your custom state backend `{clazz.__name__}` is not a subclass of `BaseStateBackend`." ) return clazz + + +_backend_instance: BaseStateBackend | None = None + + +def get_state_backend() -> BaseStateBackend: + """Return a cached instance of the configured state backend.""" + global _backend_instance + if _backend_instance is None: + _backend_instance = resolve_state_backend()() + return _backend_instance diff --git a/airflow-core/src/airflow/state/metastore.py b/airflow-core/src/airflow/state/metastore.py index 3382dad81fc65..e40a19e9c6238 100644 --- a/airflow-core/src/airflow/state/metastore.py +++ b/airflow-core/src/airflow/state/metastore.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING from sqlalchemy import delete, select +from typing_extensions import assert_never from airflow._shared.state import AssetScope, BaseStateBackend, StateScope, TaskScope from airflow._shared.timezones import timezone diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py new file mode 100644 index 0000000000000..88e73ceefe2c1 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from sqlalchemy import delete, select + +from airflow.models.asset import AssetModel +from airflow.models.asset_state import AssetStateModel +from airflow.utils.session import create_session + +if TYPE_CHECKING: + from fastapi.testclient import TestClient + from sqlalchemy.orm import Session + +pytestmark = pytest.mark.db_test + + +@pytest.fixture(autouse=True) +def reset_state_tables(): + with create_session() as session: + session.execute(delete(AssetStateModel)) + session.execute(delete(AssetModel)) + + +@pytest.fixture +def asset(session: Session) -> AssetModel: + asset = AssetModel(name="test_asset", uri="s3://bucket/test", group="asset") + session.add(asset) + session.commit() + return asset + + +def _api_url(asset_id: int, key: str | None = None) -> str: + base = f"/execution/state/asset/{asset_id}" + return f"{base}/{key}" if key else base + + +class TestGetAssetState: + def test_get_returns_value(self, client: TestClient, asset: AssetModel): + client.put(_api_url(asset.id, "watermark"), json={"value": "2026-04-29"}) + + response = client.get(_api_url(asset.id, "watermark")) + + assert response.status_code == 200 + assert response.json() == {"value": "2026-04-29"} + + def test_get_missing_key_returns_404(self, client: TestClient, asset: AssetModel): + response = client.get(_api_url(asset.id, "never_set")) + + assert response.status_code == 404 + assert response.json()["detail"]["reason"] == "not_found" + + +class TestPutAssetState: + def test_put_creates_row(self, client: TestClient, asset: AssetModel): + response = client.put(_api_url(asset.id, "watermark"), json={"value": "2026-04-29"}) + + assert response.status_code == 204 + with create_session() as session: + row = session.scalar( + select(AssetStateModel).where( + AssetStateModel.asset_id == asset.id, + AssetStateModel.key == "watermark", + ) + ) + assert row is not None + assert row.value == "2026-04-29" + + def test_put_overwrites_existing(self, client: TestClient, asset: AssetModel): + client.put(_api_url(asset.id, "watermark"), json={"value": "2026-04-28"}) + + response = client.put(_api_url(asset.id, "watermark"), json={"value": "2026-04-29"}) + + assert response.status_code == 204 + assert client.get(_api_url(asset.id, "watermark")).json() == {"value": "2026-04-29"} + + def test_put_empty_body_returns_422(self, client: TestClient, asset: AssetModel): + response = client.put(_api_url(asset.id, "watermark"), json={}) + + assert response.status_code == 422 + + def test_put_extra_field_returns_422(self, client: TestClient, asset: AssetModel): + response = client.put(_api_url(asset.id, "watermark"), json={"value": "x", "extra": "y"}) + + assert response.status_code == 422 + + def test_put_unknown_asset_returns_404(self, client: TestClient): + response = client.put(_api_url(999999, "watermark"), json={"value": "x"}) + + assert response.status_code == 404 + assert "999999" in response.json()["detail"]["message"] + + +class TestDeleteAssetState: + def test_delete_removes_key(self, client: TestClient, asset: AssetModel): + client.put(_api_url(asset.id, "watermark"), json={"value": "2026-04-29"}) + + response = client.delete(_api_url(asset.id, "watermark")) + + assert response.status_code == 204 + assert client.get(_api_url(asset.id, "watermark")).status_code == 404 + + def test_delete_missing_key_is_noop(self, client: TestClient, asset: AssetModel): + response = client.delete(_api_url(asset.id, "never_existed")) + + assert response.status_code == 204 + + +class TestClearAssetState: + def test_clear_removes_all_keys(self, client: TestClient, asset: AssetModel): + for k, v in [("watermark", "a"), ("last_id", "b"), ("schema_hash", "c")]: + client.put(_api_url(asset.id, k), json={"value": v}) + + response = client.delete(_api_url(asset.id)) + + assert response.status_code == 204 + with create_session() as session: + row = session.scalar(select(AssetStateModel).where(AssetStateModel.asset_id == asset.id)) + assert row is None diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py new file mode 100644 index 0000000000000..3ede3ad629c55 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py @@ -0,0 +1,214 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING +from uuid import uuid4 + +import pytest +from sqlalchemy import delete, select + +from airflow._shared.timezones import timezone +from airflow.models.dagrun import DagRun +from airflow.models.task_state import TaskStateModel +from airflow.utils.session import create_session + +if TYPE_CHECKING: + from fastapi.testclient import TestClient + + from tests_common.pytest_plugin import CreateTaskInstance + +pytestmark = pytest.mark.db_test + + +@pytest.fixture(autouse=True) +def reset_state_tables(): + with create_session() as session: + session.execute(delete(TaskStateModel)) + session.execute(delete(DagRun)) + + +def _api_url(ti_id, key: str | None = None) -> str: + base = f"/execution/state/ti/{ti_id}" + return f"{base}/{key}" if key else base + + +class TestGetTaskState: + def test_get_returns_value(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + client.put(_api_url(ti.id, "job_id"), json={"value": "spark_001"}) + + response = client.get(_api_url(ti.id, "job_id")) + + assert response.status_code == 200 + assert response.json() == {"value": "spark_001"} + + def test_get_missing_key_returns_404(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + + response = client.get(_api_url(ti.id, "never_set")) + + assert response.status_code == 404 + assert response.json()["detail"]["reason"] == "not_found" + + def test_get_missing_ti_returns_404(self, client: TestClient): + response = client.get(_api_url(uuid4(), "any_key")) + + assert response.status_code == 404 + assert "Task instance" in response.json()["detail"]["message"] + + +class TestPutTaskState: + def test_put_creates_row(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + + response = client.put(_api_url(ti.id, "job_id"), json={"value": "spark_001"}) + + assert response.status_code == 204 + with create_session() as session: + row = session.scalar( + select(TaskStateModel).where( + TaskStateModel.dag_id == ti.dag_id, + TaskStateModel.run_id == ti.run_id, + TaskStateModel.task_id == ti.task_id, + TaskStateModel.key == "job_id", + ) + ) + assert row is not None + assert row.value == "spark_001" + + def test_put_overwrites_existing(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + client.put(_api_url(ti.id, "job_id"), json={"value": "spark_001"}) + + response = client.put(_api_url(ti.id, "job_id"), json={"value": "spark_002"}) + + assert response.status_code == 204 + assert client.get(_api_url(ti.id, "job_id")).json() == {"value": "spark_002"} + + def test_put_empty_body_returns_422(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + + response = client.put(_api_url(ti.id, "job_id"), json={}) + + assert response.status_code == 422 + + def test_put_extra_field_returns_422(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + + response = client.put(_api_url(ti.id, "job_id"), json={"value": "x", "extra": "y"}) + + assert response.status_code == 422 + + def test_put_null_value_returns_422(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + + response = client.put(_api_url(ti.id, "job_id"), json={"value": None}) + + assert response.status_code == 422 + + def test_put_missing_ti_returns_404(self, client: TestClient): + response = client.put(_api_url(uuid4(), "job_id"), json={"value": "x"}) + + assert response.status_code == 404 + + +class TestDeleteTaskState: + def test_delete_removes_key(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + client.put(_api_url(ti.id, "job_id"), json={"value": "spark_001"}) + + response = client.delete(_api_url(ti.id, "job_id")) + + assert response.status_code == 204 + assert client.get(_api_url(ti.id, "job_id")).status_code == 404 + + def test_delete_missing_key_is_noop(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + + response = client.delete(_api_url(ti.id, "never_existed")) + + assert response.status_code == 204 + + def test_delete_only_targets_one_key(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + client.put(_api_url(ti.id, "job_id"), json={"value": "a"}) + client.put(_api_url(ti.id, "checkpoint"), json={"value": "b"}) + + client.delete(_api_url(ti.id, "job_id")) + + assert client.get(_api_url(ti.id, "job_id")).status_code == 404 + assert client.get(_api_url(ti.id, "checkpoint")).json() == {"value": "b"} + + +class TestClearTaskState: + def test_clear_removes_all_keys(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + for k, v in [("job_id", "a"), ("checkpoint", "b"), ("retry_count", "c")]: + client.put(_api_url(ti.id, k), json={"value": v}) + + response = client.delete(_api_url(ti.id)) + + assert response.status_code == 204 + with create_session() as session: + count = session.scalar( + select(TaskStateModel) + .where(TaskStateModel.dag_id == ti.dag_id, TaskStateModel.task_id == ti.task_id) + .with_only_columns(TaskStateModel.key) + ) + assert count is None + + def test_clear_when_empty_is_noop(self, client: TestClient, create_task_instance: CreateTaskInstance): + ti = create_task_instance() + + response = client.delete(_api_url(ti.id)) + + assert response.status_code == 204 + + def test_clear_from_mapped_ti_wipes_all_map_indices( + self, client: TestClient, create_task_instance: CreateTaskInstance + ): + """Clear-all from a mapped instance wipes state across mapped tasks.""" + ti = create_task_instance(map_index=2) + with create_session() as session: + now = timezone.utcnow() + for idx in (0, 1, 2): + session.add( + TaskStateModel( + dag_run_id=ti.dag_run.id, + dag_id=ti.dag_id, + run_id=ti.run_id, + task_id=ti.task_id, + map_index=idx, + key="job_id", + value=f"app_{idx}", + updated_at=now, + ) + ) + session.commit() + + response = client.delete(_api_url(ti.id)) + + assert response.status_code == 204 + with create_session() as session: + remaining = session.scalars( + select(TaskStateModel).where( + TaskStateModel.dag_id == ti.dag_id, + TaskStateModel.task_id == ti.task_id, + ) + ).all() + assert remaining == [] diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index bafd6890f46bd..ca91fb7082553 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -63,6 +63,28 @@ class AssetProfile(BaseModel): type: Annotated[str, Field(title="Type")] +class AssetStatePutBody(BaseModel): + """ + Request body for setting an asset state value. + """ + + model_config = ConfigDict( + extra="forbid", + ) + value: Annotated[str, Field(title="Value")] + + +class AssetStateResponse(BaseModel): + """ + Asset state value returned to a worker. + """ + + model_config = ConfigDict( + extra="forbid", + ) + value: Annotated[str, Field(title="Value")] + + class ConnectionResponse(BaseModel): """ Connection schema for responses with fields that are needed for Runtime. @@ -343,6 +365,28 @@ class TaskInstanceState(str, Enum): DEFERRED = "deferred" +class TaskStatePutBody(BaseModel): + """ + Request body for setting a task state value. + """ + + model_config = ConfigDict( + extra="forbid", + ) + value: Annotated[str, Field(title="Value")] + + +class TaskStateResponse(BaseModel): + """ + Task state value returned to a worker. + """ + + model_config = ConfigDict( + extra="forbid", + ) + value: Annotated[str, Field(title="Value")] + + class TaskStatesResponse(BaseModel): """ Response for task states with run_id, task and state. From 537d73b6a349e560d93184bbb0f2f2b690abe948 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 30 Apr 2026 13:58:56 +0530 Subject: [PATCH 2/7] Adapting the APIs to match first PR re clear operation --- .../execution_api/routes/task_state.py | 17 +++++---- .../versions/head/test_task_state.py | 36 +++++++++++++++---- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py index f8b49c8a3eb8d..7b2ea2f545abe 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py @@ -20,7 +20,7 @@ from uuid import UUID from cadwyn import VersionedAPIRouter -from fastapi import HTTPException, Path, Security, status +from fastapi import HTTPException, Path, Query, Security, status from sqlalchemy.orm import Session from airflow._shared.state import TaskScope @@ -103,13 +103,16 @@ def delete_task_state( def clear_task_state( task_instance_id: UUID, session: SessionDep, + all_map_indices: Annotated[bool, Query()] = False, ) -> None: """ - Delete all task state keys for this task. + Delete all task state keys for this task instance. - For mapped tasks this clears state across every map_index of the task; the - requesting worker's ``map_index`` is intentionally dropped from the scope. In short, - this wipe's off a task's state entirely. + By default, only keys for the requesting TI's exact ``map_index`` are + cleared — same isolation as single-key DELETE. Pass + ``?all_map_indices=true`` to wipe state across every mapped instance of + the task; the SDK forwards this when the caller asks for a fleet-wide + reset. """ ti = session.get(TI, task_instance_id) if ti is None: @@ -120,5 +123,5 @@ def clear_task_state( "message": f"Task instance {task_instance_id} not found", }, ) - scope = TaskScope(dag_id=ti.dag_id, run_id=ti.run_id, task_id=ti.task_id) - get_state_backend().clear(scope) + scope = TaskScope(dag_id=ti.dag_id, run_id=ti.run_id, task_id=ti.task_id, map_index=ti.map_index) + get_state_backend().clear(scope, all_map_indices=all_map_indices) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py index 3ede3ad629c55..cb27b94cee139 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py @@ -179,14 +179,10 @@ def test_clear_when_empty_is_noop(self, client: TestClient, create_task_instance assert response.status_code == 204 - def test_clear_from_mapped_ti_wipes_all_map_indices( - self, client: TestClient, create_task_instance: CreateTaskInstance - ): - """Clear-all from a mapped instance wipes state across mapped tasks.""" - ti = create_task_instance(map_index=2) + def _seed_fleet_rows(self, ti, indices: tuple[int, ...]) -> None: with create_session() as session: now = timezone.utcnow() - for idx in (0, 1, 2): + for idx in indices: session.add( TaskStateModel( dag_run_id=ti.dag_run.id, @@ -201,8 +197,36 @@ def test_clear_from_mapped_ti_wipes_all_map_indices( ) session.commit() + def test_clear_default_only_clears_this_map_index( + self, client: TestClient, create_task_instance: CreateTaskInstance + ): + """Clear without the query param only wipes the requesting TI's own map_index.""" + ti = create_task_instance(map_index=2) + self._seed_fleet_rows(ti, (0, 1, 2)) + response = client.delete(_api_url(ti.id)) + assert response.status_code == 204 + with create_session() as session: + remaining_indices = sorted( + session.scalars( + select(TaskStateModel.map_index).where( + TaskStateModel.dag_id == ti.dag_id, + TaskStateModel.task_id == ti.task_id, + ) + ).all() + ) + assert remaining_indices == [0, 1] + + def test_clear_with_all_map_indices_query_param_wipes_fleet( + self, client: TestClient, create_task_instance: CreateTaskInstance + ): + """Clear with ?all_map_indices=true wipes state for every mapped instance.""" + ti = create_task_instance(map_index=2) + self._seed_fleet_rows(ti, (0, 1, 2)) + + response = client.delete(_api_url(ti.id), params={"all_map_indices": "true"}) + assert response.status_code == 204 with create_session() as session: remaining = session.scalars( From ece2d7eca944816e39aa56ee89e41aef31b934b0 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 30 Apr 2026 15:21:38 +0530 Subject: [PATCH 3/7] fixing the static checks --- airflow-core/src/airflow/state/metastore.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow-core/src/airflow/state/metastore.py b/airflow-core/src/airflow/state/metastore.py index e40a19e9c6238..3382dad81fc65 100644 --- a/airflow-core/src/airflow/state/metastore.py +++ b/airflow-core/src/airflow/state/metastore.py @@ -20,7 +20,6 @@ from typing import TYPE_CHECKING from sqlalchemy import delete, select -from typing_extensions import assert_never from airflow._shared.state import AssetScope, BaseStateBackend, StateScope, TaskScope from airflow._shared.timezones import timezone From 0add86e5a3a8301d480e297c6466d723f0cf2be5 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 30 Apr 2026 16:27:55 +0530 Subject: [PATCH 4/7] update to using asset name instead of id --- .../execution_api/routes/asset_state.py | 57 ++++++++++++------- .../versions/head/test_asset_state.py | 38 ++++++------- 2 files changed, 56 insertions(+), 39 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py index fe7137574ca9d..dfb2141029510 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py @@ -17,10 +17,13 @@ """ Execution API routes for asset state. -Per-task asset registration checks (i.e. enforcing that the requesting task -references the asset as inlet or outlet) are intentionally not implemented -here. AIP-103 sketches a write/read asymmetry but explicitly defers the -precise authorization rules to AIP-93 as mentioned in AIP-103. +Asset state is keyed by asset *name* (not integer id) in the URL — asset names +are unique, and callers (task SDK accessors) have the name from their Asset +object without needing a DB lookup. The route resolves name → asset_id +internally for the state backend scope. + +Per-task asset registration checks are intentionally not implemented here +(deferred to AIP-93 — see TODO comment below). """ from __future__ import annotations @@ -29,6 +32,7 @@ from cadwyn import VersionedAPIRouter from fastapi import HTTPException, Path, status +from sqlalchemy import select from airflow._shared.state import AssetScope from airflow.api_fastapi.common.db.common import SessionDep @@ -53,12 +57,25 @@ ) -@router.get("/{asset_id}/{key}") +def _resolve_asset_id(name: str, session: SessionDep) -> int: + """Resolve asset name → integer asset_id, 404 if not found.""" + asset_id = session.scalar(select(AssetModel.id).where(AssetModel.name == name)) + if asset_id is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"reason": "not_found", "message": f"Asset {name!r} not found"}, + ) + return asset_id + + +@router.get("/{name}/{key}") def get_asset_state( - asset_id: int, + name: Annotated[str, Path(min_length=1)], key: Annotated[str, Path(min_length=1)], + session: SessionDep, ) -> AssetStateResponse: - """Get an asset state.""" + """Get an asset state value.""" + asset_id = _resolve_asset_id(name, session) value = get_state_backend().get(AssetScope(asset_id=asset_id), key) if value is None: raise HTTPException( @@ -71,34 +88,34 @@ def get_asset_state( return AssetStateResponse(value=value) -@router.put("/{asset_id}/{key}", status_code=status.HTTP_204_NO_CONTENT) +@router.put("/{name}/{key}", status_code=status.HTTP_204_NO_CONTENT) def set_asset_state( - asset_id: int, + name: Annotated[str, Path(min_length=1)], key: Annotated[str, Path(min_length=1)], body: AssetStatePutBody, session: SessionDep, ) -> None: - """Set an asset state.""" - if session.get(AssetModel, asset_id) is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail={"reason": "not_found", "message": f"Asset {asset_id} not found"}, - ) + """Set an asset state value.""" + asset_id = _resolve_asset_id(name, session) get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value) -@router.delete("/{asset_id}/{key}", status_code=status.HTTP_204_NO_CONTENT) +@router.delete("/{name}/{key}", status_code=status.HTTP_204_NO_CONTENT) def delete_asset_state( - asset_id: int, + name: Annotated[str, Path(min_length=1)], key: Annotated[str, Path(min_length=1)], + session: SessionDep, ) -> None: - """Delete an asset state.""" + """Delete a single asset state key.""" + asset_id = _resolve_asset_id(name, session) get_state_backend().delete(AssetScope(asset_id=asset_id), key) -@router.delete("/{asset_id}", status_code=status.HTTP_204_NO_CONTENT) +@router.delete("/{name}", status_code=status.HTTP_204_NO_CONTENT) def clear_asset_state( - asset_id: int, + name: Annotated[str, Path(min_length=1)], + session: SessionDep, ) -> None: """Delete all state keys for an asset.""" + asset_id = _resolve_asset_id(name, session) get_state_backend().clear(AssetScope(asset_id=asset_id)) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py index 88e73ceefe2c1..ae41398b42758 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py @@ -47,22 +47,22 @@ def asset(session: Session) -> AssetModel: return asset -def _api_url(asset_id: int, key: str | None = None) -> str: - base = f"/execution/state/asset/{asset_id}" +def _api_url(name: str, key: str | None = None) -> str: + base = f"/execution/state/asset/{name}" return f"{base}/{key}" if key else base class TestGetAssetState: def test_get_returns_value(self, client: TestClient, asset: AssetModel): - client.put(_api_url(asset.id, "watermark"), json={"value": "2026-04-29"}) + client.put(_api_url(asset.name, "watermark"), json={"value": "2026-04-29"}) - response = client.get(_api_url(asset.id, "watermark")) + response = client.get(_api_url(asset.name, "watermark")) assert response.status_code == 200 assert response.json() == {"value": "2026-04-29"} def test_get_missing_key_returns_404(self, client: TestClient, asset: AssetModel): - response = client.get(_api_url(asset.id, "never_set")) + response = client.get(_api_url(asset.name, "never_set")) assert response.status_code == 404 assert response.json()["detail"]["reason"] == "not_found" @@ -70,7 +70,7 @@ def test_get_missing_key_returns_404(self, client: TestClient, asset: AssetModel class TestPutAssetState: def test_put_creates_row(self, client: TestClient, asset: AssetModel): - response = client.put(_api_url(asset.id, "watermark"), json={"value": "2026-04-29"}) + response = client.put(_api_url(asset.name, "watermark"), json={"value": "2026-04-29"}) assert response.status_code == 204 with create_session() as session: @@ -84,41 +84,41 @@ def test_put_creates_row(self, client: TestClient, asset: AssetModel): assert row.value == "2026-04-29" def test_put_overwrites_existing(self, client: TestClient, asset: AssetModel): - client.put(_api_url(asset.id, "watermark"), json={"value": "2026-04-28"}) + client.put(_api_url(asset.name, "watermark"), json={"value": "2026-04-28"}) - response = client.put(_api_url(asset.id, "watermark"), json={"value": "2026-04-29"}) + response = client.put(_api_url(asset.name, "watermark"), json={"value": "2026-04-29"}) assert response.status_code == 204 - assert client.get(_api_url(asset.id, "watermark")).json() == {"value": "2026-04-29"} + assert client.get(_api_url(asset.name, "watermark")).json() == {"value": "2026-04-29"} def test_put_empty_body_returns_422(self, client: TestClient, asset: AssetModel): - response = client.put(_api_url(asset.id, "watermark"), json={}) + response = client.put(_api_url(asset.name, "watermark"), json={}) assert response.status_code == 422 def test_put_extra_field_returns_422(self, client: TestClient, asset: AssetModel): - response = client.put(_api_url(asset.id, "watermark"), json={"value": "x", "extra": "y"}) + response = client.put(_api_url(asset.name, "watermark"), json={"value": "x", "extra": "y"}) assert response.status_code == 422 def test_put_unknown_asset_returns_404(self, client: TestClient): - response = client.put(_api_url(999999, "watermark"), json={"value": "x"}) + response = client.put(_api_url("nonexistent", "watermark"), json={"value": "x"}) assert response.status_code == 404 - assert "999999" in response.json()["detail"]["message"] + assert "nonexistent" in response.json()["detail"]["message"] class TestDeleteAssetState: def test_delete_removes_key(self, client: TestClient, asset: AssetModel): - client.put(_api_url(asset.id, "watermark"), json={"value": "2026-04-29"}) + client.put(_api_url(asset.name, "watermark"), json={"value": "2026-04-29"}) - response = client.delete(_api_url(asset.id, "watermark")) + response = client.delete(_api_url(asset.name, "watermark")) assert response.status_code == 204 - assert client.get(_api_url(asset.id, "watermark")).status_code == 404 + assert client.get(_api_url(asset.name, "watermark")).status_code == 404 def test_delete_missing_key_is_noop(self, client: TestClient, asset: AssetModel): - response = client.delete(_api_url(asset.id, "never_existed")) + response = client.delete(_api_url(asset.name, "never_existed")) assert response.status_code == 204 @@ -126,9 +126,9 @@ def test_delete_missing_key_is_noop(self, client: TestClient, asset: AssetModel) class TestClearAssetState: def test_clear_removes_all_keys(self, client: TestClient, asset: AssetModel): for k, v in [("watermark", "a"), ("last_id", "b"), ("schema_hash", "c")]: - client.put(_api_url(asset.id, k), json={"value": v}) + client.put(_api_url(asset.name, k), json={"value": v}) - response = client.delete(_api_url(asset.id)) + response = client.delete(_api_url(asset.name)) assert response.status_code == 204 with create_session() as session: From 96e92bffdd17e48f86527833c96e172dadd41350 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 1 May 2026 19:09:18 +0530 Subject: [PATCH 5/7] Review comments from kaxil --- .../execution_api/routes/asset_state.py | 10 ++++---- .../execution_api/routes/task_state.py | 25 +++++++++++++------ .../execution_api/versions/__init__.py | 3 ++- .../execution_api/versions/v2026_04_17.py | 19 +++++++++++++- airflow-core/src/airflow/state/__init__.py | 7 +++++- .../versions/head/test_asset_state.py | 25 ++++++++++++++++++- .../versions/head/test_task_state.py | 13 +++++----- 7 files changed, 79 insertions(+), 23 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py index dfb2141029510..c528b74581dcb 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py @@ -59,7 +59,7 @@ def _resolve_asset_id(name: str, session: SessionDep) -> int: """Resolve asset name → integer asset_id, 404 if not found.""" - asset_id = session.scalar(select(AssetModel.id).where(AssetModel.name == name)) + asset_id = session.scalar(select(AssetModel.id).where(AssetModel.name == name, AssetModel.active.has())) if asset_id is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -76,7 +76,7 @@ def get_asset_state( ) -> AssetStateResponse: """Get an asset state value.""" asset_id = _resolve_asset_id(name, session) - value = get_state_backend().get(AssetScope(asset_id=asset_id), key) + value = get_state_backend().get(AssetScope(asset_id=asset_id), key, session=session) if value is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -97,7 +97,7 @@ def set_asset_state( ) -> None: """Set an asset state value.""" asset_id = _resolve_asset_id(name, session) - get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value) + get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, session=session) @router.delete("/{name}/{key}", status_code=status.HTTP_204_NO_CONTENT) @@ -108,7 +108,7 @@ def delete_asset_state( ) -> None: """Delete a single asset state key.""" asset_id = _resolve_asset_id(name, session) - get_state_backend().delete(AssetScope(asset_id=asset_id), key) + get_state_backend().delete(AssetScope(asset_id=asset_id), key, session=session) @router.delete("/{name}", status_code=status.HTTP_204_NO_CONTENT) @@ -118,4 +118,4 @@ def clear_asset_state( ) -> None: """Delete all state keys for an asset.""" asset_id = _resolve_asset_id(name, session) - get_state_backend().clear(AssetScope(asset_id=asset_id)) + get_state_backend().clear(AssetScope(asset_id=asset_id), session=session) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py index 7b2ea2f545abe..08d7cb4b8b4aa 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py @@ -64,7 +64,7 @@ def get_task_state( ) -> TaskStateResponse: """Get value for a task state.""" scope = _get_task_scope_for_ti(task_instance_id, session) - value = get_state_backend().get(scope, key) + value = get_state_backend().get(scope, key, session=session) if value is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -85,7 +85,7 @@ def set_task_state( ) -> None: """Set a task state key, creating or updating the row.""" scope = _get_task_scope_for_ti(task_instance_id, session) - get_state_backend().set(scope, key, body.value) + get_state_backend().set(scope, key, body.value, session=session) @router.delete("/{task_instance_id}/{key}", status_code=status.HTTP_204_NO_CONTENT) @@ -96,7 +96,7 @@ def delete_task_state( ) -> None: """Delete a single task state key.""" scope = _get_task_scope_for_ti(task_instance_id, session) - get_state_backend().delete(scope, key) + get_state_backend().delete(scope, key, session=session) @router.delete("/{task_instance_id}", status_code=status.HTTP_204_NO_CONTENT) @@ -109,10 +109,19 @@ def clear_task_state( Delete all task state keys for this task instance. By default, only keys for the requesting TI's exact ``map_index`` are - cleared — same isolation as single-key DELETE. Pass - ``?all_map_indices=true`` to wipe state across every mapped instance of - the task; the SDK forwards this when the caller asks for a fleet-wide - reset. + cleared — same isolation as DELETE endpoint above. + + Pass ``?all_map_indices=true`` to wipe state for every mapped sibling of + the task in the same DAG run. This is intentionally fleet-wide: the + ``ti:self`` JWT authentication scope authenticates that the caller is + a legitimate member of the mapped task group, and grants it authority + to reset shared task state on behalf of the whole group. + The SDK only forwards this flag when the user calls ``task_state.clear(all_map_indices=True)`` + explicitly, so the expanded scope is always an explicit opt-in by the task author. + + For non-mapped tasks (``map_index=-1``), there is only ever one index, so + ``?all_map_indices=true`` is functionally identical to the default and is + accepted without error. """ ti = session.get(TI, task_instance_id) if ti is None: @@ -124,4 +133,4 @@ def clear_task_state( }, ) scope = TaskScope(dag_id=ti.dag_id, run_id=ti.run_id, task_id=ti.task_id, map_index=ti.map_index) - get_state_backend().clear(scope, all_map_indices=all_map_indices) + get_state_backend().clear(scope, all_map_indices=all_map_indices, session=session) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 0a33e2e3af55e..4d36d00528d3a 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -40,13 +40,14 @@ MovePreviousRunEndpoint, RemoveUpstreamMapIndexesField, ) -from airflow.api_fastapi.execution_api.versions.v2026_04_17 import AddTeamNameField +from airflow.api_fastapi.execution_api.versions.v2026_04_17 import AddStateEndpoints, AddTeamNameField bundle = VersionBundle( HeadVersion(), Version( "2026-04-17", AddTeamNameField, + AddStateEndpoints, ), Version( "2026-04-06", diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py index e7cd9d331a591..a4aed364cc54f 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py @@ -17,7 +17,7 @@ from __future__ import annotations -from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema +from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, endpoint, schema from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext @@ -34,3 +34,20 @@ def remove_team_name_field(response: ResponseInfo) -> None: # type: ignore[misc """Remove the ``team_name`` field from dag_run for older API versions.""" if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): response.body["dag_run"].pop("team_name", None) + + +class AddStateEndpoints(VersionChange): + """Add task state and asset state CRUD endpoints.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = ( + endpoint("/state/ti/{task_instance_id}/{key}", ["GET"]).didnt_exist, + endpoint("/state/ti/{task_instance_id}/{key}", ["PUT"]).didnt_exist, + endpoint("/state/ti/{task_instance_id}/{key}", ["DELETE"]).didnt_exist, + endpoint("/state/ti/{task_instance_id}", ["DELETE"]).didnt_exist, + endpoint("/state/asset/{name}/{key}", ["GET"]).didnt_exist, + endpoint("/state/asset/{name}/{key}", ["PUT"]).didnt_exist, + endpoint("/state/asset/{name}/{key}", ["DELETE"]).didnt_exist, + endpoint("/state/asset/{name}", ["DELETE"]).didnt_exist, + ) diff --git a/airflow-core/src/airflow/state/__init__.py b/airflow-core/src/airflow/state/__init__.py index f054504a7c201..109b9e9b37f04 100644 --- a/airflow-core/src/airflow/state/__init__.py +++ b/airflow-core/src/airflow/state/__init__.py @@ -17,6 +17,8 @@ # under the License. from __future__ import annotations +import threading + from airflow._shared.state import ( AssetScope as AssetScope, BaseStateBackend as BaseStateBackend, @@ -39,11 +41,14 @@ def resolve_state_backend() -> type[BaseStateBackend]: _backend_instance: BaseStateBackend | None = None +_backend_lock = threading.Lock() def get_state_backend() -> BaseStateBackend: """Return a cached instance of the configured state backend.""" global _backend_instance if _backend_instance is None: - _backend_instance = resolve_state_backend()() + with _backend_lock: + if _backend_instance is None: + _backend_instance = resolve_state_backend()() return _backend_instance diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py index ae41398b42758..dfeb69a055ada 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py @@ -21,7 +21,7 @@ import pytest from sqlalchemy import delete, select -from airflow.models.asset import AssetModel +from airflow.models.asset import AssetActive, AssetModel from airflow.models.asset_state import AssetStateModel from airflow.utils.session import create_session @@ -43,6 +43,17 @@ def reset_state_tables(): def asset(session: Session) -> AssetModel: asset = AssetModel(name="test_asset", uri="s3://bucket/test", group="asset") session.add(asset) + session.flush() + session.add(AssetActive.for_asset(asset)) + session.commit() + return asset + + +@pytest.fixture +def inactive_asset(session: Session) -> AssetModel: + """An asset row with no asset_active entry — simulates a removed asset.""" + asset = AssetModel(name="inactive_asset", uri="s3://bucket/inactive", group="asset") + session.add(asset) session.commit() return asset @@ -134,3 +145,15 @@ def test_clear_removes_all_keys(self, client: TestClient, asset: AssetModel): with create_session() as session: row = session.scalar(select(AssetStateModel).where(AssetStateModel.asset_id == asset.id)) assert row is None + + +class TestInactiveAssetRejected: + """An asset row without a corresponding asset_active entry is treated as not found.""" + + def test_get_inactive_asset_returns_404(self, client: TestClient, inactive_asset: AssetModel): + response = client.get(_api_url(inactive_asset.name, "watermark")) + assert response.status_code == 404 + + def test_put_inactive_asset_returns_404(self, client: TestClient, inactive_asset: AssetModel): + response = client.put(_api_url(inactive_asset.name, "watermark"), json={"value": "x"}) + assert response.status_code == 404 diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py index cb27b94cee139..0f2b630297281 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py @@ -165,12 +165,13 @@ def test_clear_removes_all_keys(self, client: TestClient, create_task_instance: assert response.status_code == 204 with create_session() as session: - count = session.scalar( - select(TaskStateModel) - .where(TaskStateModel.dag_id == ti.dag_id, TaskStateModel.task_id == ti.task_id) - .with_only_columns(TaskStateModel.key) - ) - assert count is None + remaining = session.scalars( + select(TaskStateModel.key).where( + TaskStateModel.dag_id == ti.dag_id, + TaskStateModel.task_id == ti.task_id, + ) + ).all() + assert remaining == [] def test_clear_when_empty_is_noop(self, client: TestClient, create_task_instance: CreateTaskInstance): ti = create_task_instance() From 7bd1f4c64ae259a8107292955329368f593353ac Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Sat, 2 May 2026 18:51:36 +0530 Subject: [PATCH 6/7] Review comments from kaxil part 2 --- .../execution_api/routes/asset_state.py | 26 ++++---- .../execution_api/routes/task_state.py | 14 +--- .../execution_api/versions/v2026_04_17.py | 8 +-- .../versions/head/test_asset_state.py | 66 +++++++++++++------ .../versions/head/test_task_state.py | 39 ++++++++++- 5 files changed, 103 insertions(+), 50 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py index c528b74581dcb..04ffa4e02e262 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py @@ -31,7 +31,7 @@ from typing import Annotated from cadwyn import VersionedAPIRouter -from fastapi import HTTPException, Path, status +from fastapi import HTTPException, Query, status from sqlalchemy import select from airflow._shared.state import AssetScope @@ -40,6 +40,7 @@ AssetStatePutBody, AssetStateResponse, ) +from airflow.api_fastapi.execution_api.security import ExecutionAPIRoute from airflow.models.asset import AssetModel from airflow.state import get_state_backend @@ -50,6 +51,7 @@ # Proper fix is a unified asset-registration check across all asset routes, # not just here. router = VersionedAPIRouter( + route_class=ExecutionAPIRoute, responses={ status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"}, status.HTTP_404_NOT_FOUND: {"description": "Not found"}, @@ -68,10 +70,10 @@ def _resolve_asset_id(name: str, session: SessionDep) -> int: return asset_id -@router.get("/{name}/{key}") +@router.get("/value") def get_asset_state( - name: Annotated[str, Path(min_length=1)], - key: Annotated[str, Path(min_length=1)], + name: Annotated[str, Query(min_length=1)], + key: Annotated[str, Query(min_length=1)], session: SessionDep, ) -> AssetStateResponse: """Get an asset state value.""" @@ -88,10 +90,10 @@ def get_asset_state( return AssetStateResponse(value=value) -@router.put("/{name}/{key}", status_code=status.HTTP_204_NO_CONTENT) +@router.put("/value", status_code=status.HTTP_204_NO_CONTENT) def set_asset_state( - name: Annotated[str, Path(min_length=1)], - key: Annotated[str, Path(min_length=1)], + name: Annotated[str, Query(min_length=1)], + key: Annotated[str, Query(min_length=1)], body: AssetStatePutBody, session: SessionDep, ) -> None: @@ -100,10 +102,10 @@ def set_asset_state( get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, session=session) -@router.delete("/{name}/{key}", status_code=status.HTTP_204_NO_CONTENT) +@router.delete("/value", status_code=status.HTTP_204_NO_CONTENT) def delete_asset_state( - name: Annotated[str, Path(min_length=1)], - key: Annotated[str, Path(min_length=1)], + name: Annotated[str, Query(min_length=1)], + key: Annotated[str, Query(min_length=1)], session: SessionDep, ) -> None: """Delete a single asset state key.""" @@ -111,9 +113,9 @@ def delete_asset_state( get_state_backend().delete(AssetScope(asset_id=asset_id), key, session=session) -@router.delete("/{name}", status_code=status.HTTP_204_NO_CONTENT) +@router.delete("/clear", status_code=status.HTTP_204_NO_CONTENT) def clear_asset_state( - name: Annotated[str, Path(min_length=1)], + name: Annotated[str, Query(min_length=1)], session: SessionDep, ) -> None: """Delete all state keys for an asset.""" diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py index 08d7cb4b8b4aa..acdaa8c6a24ee 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py @@ -29,11 +29,12 @@ TaskStatePutBody, TaskStateResponse, ) -from airflow.api_fastapi.execution_api.security import require_auth +from airflow.api_fastapi.execution_api.security import ExecutionAPIRoute, require_auth from airflow.models.taskinstance import TaskInstance as TI from airflow.state import get_state_backend router = VersionedAPIRouter( + route_class=ExecutionAPIRoute, responses={ status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"}, status.HTTP_403_FORBIDDEN: {"description": "Access denied"}, @@ -123,14 +124,5 @@ def clear_task_state( ``?all_map_indices=true`` is functionally identical to the default and is accepted without error. """ - ti = session.get(TI, task_instance_id) - if ti is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail={ - "reason": "not_found", - "message": f"Task instance {task_instance_id} not found", - }, - ) - scope = TaskScope(dag_id=ti.dag_id, run_id=ti.run_id, task_id=ti.task_id, map_index=ti.map_index) + scope = _get_task_scope_for_ti(task_instance_id, session) get_state_backend().clear(scope, all_map_indices=all_map_indices, session=session) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py index a4aed364cc54f..c08454c4786b8 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py @@ -46,8 +46,8 @@ class AddStateEndpoints(VersionChange): endpoint("/state/ti/{task_instance_id}/{key}", ["PUT"]).didnt_exist, endpoint("/state/ti/{task_instance_id}/{key}", ["DELETE"]).didnt_exist, endpoint("/state/ti/{task_instance_id}", ["DELETE"]).didnt_exist, - endpoint("/state/asset/{name}/{key}", ["GET"]).didnt_exist, - endpoint("/state/asset/{name}/{key}", ["PUT"]).didnt_exist, - endpoint("/state/asset/{name}/{key}", ["DELETE"]).didnt_exist, - endpoint("/state/asset/{name}", ["DELETE"]).didnt_exist, + endpoint("/state/asset/value", ["GET"]).didnt_exist, + endpoint("/state/asset/value", ["PUT"]).didnt_exist, + endpoint("/state/asset/value", ["DELETE"]).didnt_exist, + endpoint("/state/asset/clear", ["DELETE"]).didnt_exist, ) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py index dfeb69a055ada..8bdf6ac859965 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state.py @@ -58,30 +58,44 @@ def inactive_asset(session: Session) -> AssetModel: return asset -def _api_url(name: str, key: str | None = None) -> str: - base = f"/execution/state/asset/{name}" - return f"{base}/{key}" if key else base +_VALUE_URL = "/execution/state/asset/value" +_CLEAR_URL = "/execution/state/asset/clear" class TestGetAssetState: def test_get_returns_value(self, client: TestClient, asset: AssetModel): - client.put(_api_url(asset.name, "watermark"), json={"value": "2026-04-29"}) + client.put(_VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"}) - response = client.get(_api_url(asset.name, "watermark")) + response = client.get(_VALUE_URL, params={"name": asset.name, "key": "watermark"}) assert response.status_code == 200 assert response.json() == {"value": "2026-04-29"} def test_get_missing_key_returns_404(self, client: TestClient, asset: AssetModel): - response = client.get(_api_url(asset.name, "never_set")) + response = client.get(_VALUE_URL, params={"name": asset.name, "key": "never_set"}) assert response.status_code == 404 assert response.json()["detail"]["reason"] == "not_found" + def test_get_asset_name_with_slashes(self, client: TestClient, session): + slashed = AssetModel(name="team/sales/orders", uri="s3://bucket/team/sales", group="asset") + session.add(slashed) + session.flush() + session.add(AssetActive.for_asset(slashed)) + session.commit() + + client.put(_VALUE_URL, params={"name": slashed.name, "key": "wm"}, json={"value": "x"}) + response = client.get(_VALUE_URL, params={"name": slashed.name, "key": "wm"}) + + assert response.status_code == 200 + assert response.json() == {"value": "x"} + class TestPutAssetState: def test_put_creates_row(self, client: TestClient, asset: AssetModel): - response = client.put(_api_url(asset.name, "watermark"), json={"value": "2026-04-29"}) + response = client.put( + _VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"} + ) assert response.status_code == 204 with create_session() as session: @@ -95,25 +109,33 @@ def test_put_creates_row(self, client: TestClient, asset: AssetModel): assert row.value == "2026-04-29" def test_put_overwrites_existing(self, client: TestClient, asset: AssetModel): - client.put(_api_url(asset.name, "watermark"), json={"value": "2026-04-28"}) + client.put(_VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-28"}) - response = client.put(_api_url(asset.name, "watermark"), json={"value": "2026-04-29"}) + response = client.put( + _VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"} + ) assert response.status_code == 204 - assert client.get(_api_url(asset.name, "watermark")).json() == {"value": "2026-04-29"} + assert client.get(_VALUE_URL, params={"name": asset.name, "key": "watermark"}).json() == { + "value": "2026-04-29" + } def test_put_empty_body_returns_422(self, client: TestClient, asset: AssetModel): - response = client.put(_api_url(asset.name, "watermark"), json={}) + response = client.put(_VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={}) assert response.status_code == 422 def test_put_extra_field_returns_422(self, client: TestClient, asset: AssetModel): - response = client.put(_api_url(asset.name, "watermark"), json={"value": "x", "extra": "y"}) + response = client.put( + _VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "x", "extra": "y"} + ) assert response.status_code == 422 def test_put_unknown_asset_returns_404(self, client: TestClient): - response = client.put(_api_url("nonexistent", "watermark"), json={"value": "x"}) + response = client.put( + _VALUE_URL, params={"name": "nonexistent", "key": "watermark"}, json={"value": "x"} + ) assert response.status_code == 404 assert "nonexistent" in response.json()["detail"]["message"] @@ -121,15 +143,15 @@ def test_put_unknown_asset_returns_404(self, client: TestClient): class TestDeleteAssetState: def test_delete_removes_key(self, client: TestClient, asset: AssetModel): - client.put(_api_url(asset.name, "watermark"), json={"value": "2026-04-29"}) + client.put(_VALUE_URL, params={"name": asset.name, "key": "watermark"}, json={"value": "2026-04-29"}) - response = client.delete(_api_url(asset.name, "watermark")) + response = client.delete(_VALUE_URL, params={"name": asset.name, "key": "watermark"}) assert response.status_code == 204 - assert client.get(_api_url(asset.name, "watermark")).status_code == 404 + assert client.get(_VALUE_URL, params={"name": asset.name, "key": "watermark"}).status_code == 404 def test_delete_missing_key_is_noop(self, client: TestClient, asset: AssetModel): - response = client.delete(_api_url(asset.name, "never_existed")) + response = client.delete(_VALUE_URL, params={"name": asset.name, "key": "never_existed"}) assert response.status_code == 204 @@ -137,9 +159,9 @@ def test_delete_missing_key_is_noop(self, client: TestClient, asset: AssetModel) class TestClearAssetState: def test_clear_removes_all_keys(self, client: TestClient, asset: AssetModel): for k, v in [("watermark", "a"), ("last_id", "b"), ("schema_hash", "c")]: - client.put(_api_url(asset.name, k), json={"value": v}) + client.put(_VALUE_URL, params={"name": asset.name, "key": k}, json={"value": v}) - response = client.delete(_api_url(asset.name)) + response = client.delete(_CLEAR_URL, params={"name": asset.name}) assert response.status_code == 204 with create_session() as session: @@ -151,9 +173,11 @@ class TestInactiveAssetRejected: """An asset row without a corresponding asset_active entry is treated as not found.""" def test_get_inactive_asset_returns_404(self, client: TestClient, inactive_asset: AssetModel): - response = client.get(_api_url(inactive_asset.name, "watermark")) + response = client.get(_VALUE_URL, params={"name": inactive_asset.name, "key": "watermark"}) assert response.status_code == 404 def test_put_inactive_asset_returns_404(self, client: TestClient, inactive_asset: AssetModel): - response = client.put(_api_url(inactive_asset.name, "watermark"), json={"value": "x"}) + response = client.put( + _VALUE_URL, params={"name": inactive_asset.name, "key": "watermark"}, json={"value": "x"} + ) assert response.status_code == 404 diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py index 0f2b630297281..8a66a0a23c739 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state.py @@ -20,18 +20,22 @@ from uuid import uuid4 import pytest +from fastapi import Request +from fastapi.testclient import TestClient from sqlalchemy import delete, select from airflow._shared.timezones import timezone +from airflow.api_fastapi.app import cached_app +from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken +from airflow.api_fastapi.execution_api.security import _jwt_bearer from airflow.models.dagrun import DagRun from airflow.models.task_state import TaskStateModel from airflow.utils.session import create_session if TYPE_CHECKING: - from fastapi.testclient import TestClient - from tests_common.pytest_plugin import CreateTaskInstance + pytestmark = pytest.mark.db_test @@ -237,3 +241,34 @@ def test_clear_with_all_map_indices_query_param_wipes_fleet( ) ).all() assert remaining == [] + + +class TestTiSelfEnforcement: + @pytest.fixture + def wrong_ti_client(self): + """TestClient using the real require_auth, JWT bound to a random TI UUID.""" + app = cached_app(apps="execution") + other_ti_id = uuid4() + + async def mock_jwt(request: Request) -> TIToken: + return TIToken(id=other_ti_id, claims=TIClaims(scope="execution")) + + app.dependency_overrides[_jwt_bearer] = mock_jwt + with TestClient(app, headers={"Authorization": "Bearer fake"}) as client: + yield client + app.dependency_overrides.pop(_jwt_bearer, None) + + def test_get_with_wrong_ti_token_returns_403(self, wrong_ti_client: TestClient, create_task_instance): + ti = create_task_instance() + response = wrong_ti_client.get(_api_url(ti.id, "some_key")) + assert response.status_code == 403 + + def test_put_with_wrong_ti_token_returns_403(self, wrong_ti_client: TestClient, create_task_instance): + ti = create_task_instance() + response = wrong_ti_client.put(_api_url(ti.id, "some_key"), json={"value": "x"}) + assert response.status_code == 403 + + def test_clear_with_wrong_ti_token_returns_403(self, wrong_ti_client: TestClient, create_task_instance): + ti = create_task_instance() + response = wrong_ti_client.delete(_api_url(ti.id)) + assert response.status_code == 403 From 8d0d6de99fbf880a7c708fc341c7634f69dc0712 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Sun, 3 May 2026 15:36:27 +0530 Subject: [PATCH 7/7] trying to fix mypy issues --- .../api_fastapi/execution_api/routes/asset_state.py | 8 ++++---- .../api_fastapi/execution_api/routes/task_state.py | 8 ++++---- .../api_fastapi/execution_api/versions/__init__.py | 2 +- .../versions/0113_3_3_0_add_retry_policy_fields_to_ti.py | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py index 04ffa4e02e262..ba00260b16b2d 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py @@ -78,7 +78,7 @@ def get_asset_state( ) -> AssetStateResponse: """Get an asset state value.""" asset_id = _resolve_asset_id(name, session) - value = get_state_backend().get(AssetScope(asset_id=asset_id), key, session=session) + value = get_state_backend().get(AssetScope(asset_id=asset_id), key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it if value is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -99,7 +99,7 @@ def set_asset_state( ) -> None: """Set an asset state value.""" asset_id = _resolve_asset_id(name, session) - get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, session=session) + get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it @router.delete("/value", status_code=status.HTTP_204_NO_CONTENT) @@ -110,7 +110,7 @@ def delete_asset_state( ) -> None: """Delete a single asset state key.""" asset_id = _resolve_asset_id(name, session) - get_state_backend().delete(AssetScope(asset_id=asset_id), key, session=session) + get_state_backend().delete(AssetScope(asset_id=asset_id), key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it @router.delete("/clear", status_code=status.HTTP_204_NO_CONTENT) @@ -120,4 +120,4 @@ def clear_asset_state( ) -> None: """Delete all state keys for an asset.""" asset_id = _resolve_asset_id(name, session) - get_state_backend().clear(AssetScope(asset_id=asset_id), session=session) + get_state_backend().clear(AssetScope(asset_id=asset_id), session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py index acdaa8c6a24ee..db24109969c76 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py @@ -65,7 +65,7 @@ def get_task_state( ) -> TaskStateResponse: """Get value for a task state.""" scope = _get_task_scope_for_ti(task_instance_id, session) - value = get_state_backend().get(scope, key, session=session) + value = get_state_backend().get(scope, key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it if value is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -86,7 +86,7 @@ def set_task_state( ) -> None: """Set a task state key, creating or updating the row.""" scope = _get_task_scope_for_ti(task_instance_id, session) - get_state_backend().set(scope, key, body.value, session=session) + get_state_backend().set(scope, key, body.value, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it @router.delete("/{task_instance_id}/{key}", status_code=status.HTTP_204_NO_CONTENT) @@ -97,7 +97,7 @@ def delete_task_state( ) -> None: """Delete a single task state key.""" scope = _get_task_scope_for_ti(task_instance_id, session) - get_state_backend().delete(scope, key, session=session) + get_state_backend().delete(scope, key, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it @router.delete("/{task_instance_id}", status_code=status.HTTP_204_NO_CONTENT) @@ -125,4 +125,4 @@ def clear_task_state( accepted without error. """ scope = _get_task_scope_for_ti(task_instance_id, session) - get_state_backend().clear(scope, all_map_indices=all_map_indices, session=session) + get_state_backend().clear(scope, all_map_indices=all_map_indices, session=session) # type: ignore[call-arg] # @provide_session adds session kwarg at runtime; BaseStateBackend signature omits it so mypy can't see it diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 84f9d4db47bab..dfa27f53ebd91 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -40,7 +40,7 @@ MovePreviousRunEndpoint, RemoveUpstreamMapIndexesField, ) -from airflow.api_fastapi.execution_api.versions.v2026_04_17 import AddTeamNameField, AddStateEndpoints +from airflow.api_fastapi.execution_api.versions.v2026_04_17 import AddStateEndpoints, AddTeamNameField from airflow.api_fastapi.execution_api.versions.v2026_06_16 import AddRetryPolicyFields bundle = VersionBundle( diff --git a/airflow-core/src/airflow/migrations/versions/0113_3_3_0_add_retry_policy_fields_to_ti.py b/airflow-core/src/airflow/migrations/versions/0113_3_3_0_add_retry_policy_fields_to_ti.py index 5fc8007e4178e..a390163b57dc0 100644 --- a/airflow-core/src/airflow/migrations/versions/0113_3_3_0_add_retry_policy_fields_to_ti.py +++ b/airflow-core/src/airflow/migrations/versions/0113_3_3_0_add_retry_policy_fields_to_ti.py @@ -30,7 +30,7 @@ is a metadata-only operation (no table rewrite). Revision ID: b8f3e4a1d2c9 -Revises: 9fabad868fdb +Revises: fde9ed84d07b Create Date: 2026-04-16 12:00:00.000000 """