Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions airflow-core/docs/howto/deadline-alerts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Below is an example Dag implementation. If the Dag has not finished 15 minutes a

.. code-block:: python

from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from airflow.sdk import AsyncCallback, DAG, DeadlineAlert, DeadlineReference
from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier
from airflow.providers.standard.operators.empty import EmptyOperator
Expand Down Expand Up @@ -165,9 +165,7 @@ Here's an example using a fixed datetime:

.. code-block:: python

tomorrow_at_ten = datetime.combine(
datetime.now().date() + timedelta(days=1), time(10, 0), tzinfo=timezone.utc
)
tomorrow_at_ten = datetime.combine(datetime.now().date() + timedelta(days=1), time(10, 0))

with DAG(
dag_id="fixed_deadline_alert",
Expand Down Expand Up @@ -367,19 +365,12 @@ A **custom asynchronous callback** might look like this:
Templating and Context
^^^^^^^^^^^^^^^^^^^^^^

A relatively simple version of the Airflow context is passed to callables, and Airflow runs
:ref:`concepts:jinja-templating` on string-valued callback ``kwargs`` using that context. String
kwargs that contain ``{{ ... }}`` are rendered before the callback runs; non-string kwargs and
strings without template markers are passed through untouched, and a template that fails to render
falls back to its raw value (logged at warning) rather than failing the callback. Templating works
identically on both the synchronous (executor) and asynchronous (triggerer) callback paths.

The variables available for templating are those in the simplified context: the ID and the
calculated deadline time of the Deadline Alert (``{{ deadline.id }}``, ``{{ deadline.deadline_time }}``),
plus the Dag Run fields included in the ``GET`` REST API response for Dag Run (e.g.
``{{ dag_run.run_id }}``, ``{{ run_id }}``, ``{{ logical_date }}``, ``{{ ds }}``, ``{{ ts }}``).
Notifiers continue to run their own templating as part of their execution. Support for a more
comprehensive context will be added in future versions.
Currently, a relatively simple version of the Airflow context is passed to callables and Airflow does not run
:ref:`concepts:jinja-templating` on the kwargs. However, Notifiers already run templating with the
provided context as part of their execution. This means that templating can be used when using a Notifier
as long as the variables being templated are included in the simplified context. This currently includes the
ID and the calculated deadline time of the Deadline Alert as well as the data included in the ``GET`` REST API
response for Dag Run. Support for more comprehensive context and templating will be added in future versions.

Deadline Calculation
^^^^^^^^^^^^^^^^^^^^
Expand All @@ -392,7 +383,7 @@ In the following examples, ``notify_team`` is either a SyncCallback or AsyncCall

.. code-block:: python

next_meeting = datetime(2025, 6, 26, 9, 30, tzinfo=timezone.utc)
next_meeting = datetime(2025, 6, 26, 9, 30)

DeadlineAlert(
reference=DeadlineReference.FIXED_DATETIME(next_meeting),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

from collections.abc import Iterable
from datetime import datetime
from typing import Any
from uuid import UUID

from pydantic import AliasPath, Field, field_validator
from pydantic import AliasPath, Field

from airflow.api_fastapi.core_api.base import BaseModel

Expand Down Expand Up @@ -53,42 +52,9 @@ class DeadlineAlertResponse(BaseModel):
id: UUID
name: str | None = None
reference_type: str = Field(validation_alias=AliasPath("reference", "reference_type"))
interval: float | None = Field(
default=None,
description=(
"Interval in seconds between the reference time and the deadline. "
"Null for a dynamic interval (e.g. a VariableInterval) whose value is "
"only resolved at scheduler evaluation time."
),
)
interval: float = Field(description="Interval in seconds between deadline evaluations.")
created_at: datetime

@field_validator("interval", mode="before")
@classmethod
def coerce_interval_to_seconds(cls, value: Any) -> float | None:
"""
Coerce the stored ``interval`` into seconds.

``DeadlineAlert.interval`` is a JSON column holding the Airflow-serialized form
of the SDK interval, not a plain number. A fixed ``timedelta`` serializes to
``{"__classname__": "datetime.timedelta", "__data__": <seconds>}`` and a dynamic
``VariableInterval`` to ``{"__classname__": ".../VariableInterval", "__data__": {...}}``.
Without this coercion Pydantic cannot turn that dict into ``float`` and the
``/ui/dags/{dag_id}/deadlineAlerts`` endpoint raises a 500, which breaks the
run-page deadline status badge. Return the seconds for a fixed interval, or
``None`` for a dynamic one (resolved later by the scheduler).
"""
if value is None or isinstance(value, (int, float)):
return value
if isinstance(value, dict):
data = value.get("__data__")
# Fixed timedelta: __data__ is the total seconds as a number.
if isinstance(data, (int, float)):
return float(data)
# Dynamic interval (e.g. VariableInterval): no fixed seconds to report.
return None
return None


class DeadlineAlertCollectionResponse(BaseModel):
"""DeadlineAlert Collection serializer for responses."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,12 +894,13 @@ paths:
type: string
description: 'Attributes to order by, multi criteria sort is supported.
Prefix with `-` for descending order. Supported attributes: `id, created_at,
name`'
name, interval`'
default:
- created_at
title: Order By
description: 'Attributes to order by, multi criteria sort is supported. Prefix
with `-` for descending order. Supported attributes: `id, created_at, name`'
with `-` for descending order. Supported attributes: `id, created_at, name,
interval`'
responses:
'200':
description: Successful Response
Expand Down Expand Up @@ -2514,13 +2515,9 @@ components:
type: string
title: Reference Type
interval:
anyOf:
- type: number
- type: 'null'
type: number
title: Interval
description: Interval in seconds between the reference time and the deadline.
Null for a dynamic interval (e.g. a VariableInterval) whose value is only
resolved at scheduler evaluation time.
description: Interval in seconds between deadline evaluations.
created_at:
type: string
format: date-time
Expand All @@ -2529,6 +2526,7 @@ components:
required:
- id
- reference_type
- interval
- created_at
title: DeadlineAlertResponse
description: DeadlineAlert serializer for responses.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,8 @@ def get_dag_deadline_alerts(
order_by: Annotated[
SortParam,
Depends(
# NOTE: ``interval`` is intentionally NOT a sortable key. ``DeadlineAlert.interval`` is a
# JSON column holding the Airflow-serialized interval — a dict such as
# ``{"__classname__": "datetime.timedelta", "__data__": 300.0}`` for a fixed interval, or a
# structurally different dict for a ``VariableInterval``. Ordering by it at the DB level
# sorts by the JSON text/structure, not the duration, so the result is arbitrary and
# misleading (e.g. a dynamic VariableInterval sorts before/after fixed intervals by shape,
# and "300" vs "3600" compare lexicographically). Meaningful sorting would need a computed
# seconds column. Allow only columns that sort correctly.
SortParam(
["id", "created_at", "name"],
["id", "created_at", "name", "interval"],
DeadlineAlert,
).dynamic_depends(default="created_at")
),
Expand Down
3 changes: 1 addition & 2 deletions airflow-core/src/airflow/api_fastapi/execution_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def app(self):
from airflow.api_fastapi.execution_api.routes.connections import has_connection_access
from airflow.api_fastapi.execution_api.routes.variables import has_variable_access
from airflow.api_fastapi.execution_api.routes.xcoms import has_xcom_access
from airflow.api_fastapi.execution_api.security import _jwt_bearer, require_auth
from airflow.api_fastapi.execution_api.security import _jwt_bearer

self._app = create_task_execution_api_app()

Expand All @@ -403,7 +403,6 @@ async def always_allow(request: Request):
return TIToken(id=ti_id, claims=claims)

self._app.dependency_overrides[_jwt_bearer] = always_allow
self._app.dependency_overrides[require_auth] = always_allow
self._app.dependency_overrides[has_connection_access] = always_allow
self._app.dependency_overrides[has_variable_access] = always_allow
self._app.dependency_overrides[has_xcom_access] = always_allow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,10 @@
import logging
from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException, Path, Security, status
from fastapi import APIRouter, Depends, HTTPException, Path, status

from airflow.api_fastapi.execution_api.datamodels.connection import ConnectionResponse
from airflow.api_fastapi.execution_api.security import (
CurrentTIToken,
ExecutionAPIRoute,
get_team_name_dep,
require_auth,
)
from airflow.api_fastapi.execution_api.security import CurrentTIToken, get_team_name_dep
from airflow.exceptions import AirflowNotFoundException
from airflow.models.connection import Connection

Expand All @@ -54,19 +49,15 @@ async def has_connection_access(


router = APIRouter(
route_class=ExecutionAPIRoute,
responses={status.HTTP_404_NOT_FOUND: {"description": "Connection not found"}},
dependencies=[Depends(has_connection_access)],
)

log = logging.getLogger(__name__)


@router.get(
"/{connection_id}",
dependencies=[
Security(require_auth, scopes=["token:execution", "token:workload"]),
Depends(has_connection_access),
],
responses={
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access to the connection"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import Annotated

from cadwyn import VersionedAPIRouter
from fastapi import HTTPException, Query, Security, status
from fastapi import HTTPException, Query, status
from sqlalchemy import func, select
from sqlalchemy.exc import NoResultFound

Expand All @@ -33,15 +33,15 @@
from airflow.api_fastapi.execution_api.datamodels.dagrun import DagRunStateResponse, TriggerDAGRunPayload
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
from airflow.api_fastapi.execution_api.datamodels.token import TIToken
from airflow.api_fastapi.execution_api.security import CurrentTIToken, ExecutionAPIRoute, require_auth
from airflow.api_fastapi.execution_api.security import CurrentTIToken
from airflow.exceptions import DagNotPartitionedError, DagRunAlreadyExists, InvalidPartitionKeyError
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun as DagRunModel
from airflow.models.taskinstance import TaskInstance
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

router = VersionedAPIRouter(route_class=ExecutionAPIRoute)
router = VersionedAPIRouter()

log = logging.getLogger(__name__)

Expand All @@ -66,7 +66,6 @@ def get_previous_dagrun_compat(

@router.get(
"/{dag_id}/{run_id}",
dependencies=[Security(require_auth, scopes=["token:execution", "token:workload"])],
responses={status.HTTP_404_NOT_FOUND: {"description": "Dag run not found"}},
)
def get_dag_run(dag_id: str, run_id: str, session: SessionDep) -> DagRun:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request, Security, status
from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request, status
from sqlalchemy import func, select

from airflow.api_fastapi.common.db.common import SessionDep
Expand All @@ -29,12 +29,7 @@
VariablePostBody,
VariableResponse,
)
from airflow.api_fastapi.execution_api.security import (
CurrentTIToken,
ExecutionAPIRoute,
get_team_name_dep,
require_auth,
)
from airflow.api_fastapi.execution_api.security import CurrentTIToken, get_team_name_dep
from airflow.models.variable import Variable


Expand Down Expand Up @@ -62,7 +57,7 @@ async def has_variable_access(
return True


router = APIRouter(route_class=ExecutionAPIRoute)
router = APIRouter()

log = logging.getLogger(__name__)

Expand All @@ -73,7 +68,6 @@ async def has_variable_access(
# it requires a variable_key path parameter that /keys does not have.
@router.get(
"/keys",
dependencies=[Security(require_auth, scopes=["token:execution", "token:workload"])],
responses={
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
},
Expand Down Expand Up @@ -109,10 +103,7 @@ def get_variable_keys(

@router.get(
"/{variable_key:path}",
dependencies=[
Security(require_auth, scopes=["token:execution", "token:workload"]),
Depends(has_variable_access),
],
dependencies=[Depends(has_variable_access)],
responses={
status.HTTP_404_NOT_FOUND: {"description": "Variable not found"},
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
Expand Down Expand Up @@ -140,10 +131,7 @@ def get_variable(

@router.put(
"/{variable_key:path}",
dependencies=[
Security(require_auth, scopes=["token:execution"]),
Depends(has_variable_access),
],
dependencies=[Depends(has_variable_access)],
status_code=status.HTTP_201_CREATED,
responses={
status.HTTP_404_NOT_FOUND: {"description": "Variable not found"},
Expand All @@ -163,10 +151,7 @@ def put_variable(

@router.delete(
"/{variable_key:path}",
dependencies=[
Security(require_auth, scopes=["token:execution"]),
Depends(has_variable_access),
],
dependencies=[Depends(has_variable_access)],
status_code=status.HTTP_204_NO_CONTENT,
responses={
status.HTTP_404_NOT_FOUND: {"description": "Variable not found"},
Expand Down
27 changes: 2 additions & 25 deletions airflow-core/src/airflow/api_fastapi/execution_api/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:


async def get_team_name_dep(token=CurrentTIToken) -> str | None:
"""Return the team name associated to the task or callback (if any)."""
"""Return the team name associated to the task (if any)."""
from airflow.configuration import conf

if not conf.getboolean("core", "multi_team"):
Expand All @@ -243,12 +243,7 @@ async def get_team_name_dep(token=CurrentTIToken) -> str | None:
from airflow.utils.session import create_session_async

async with create_session_async() as session:
team_name = await session.scalar(_team_name_for_ti_stmt(token.id))
if team_name is not None:
return team_name
# Workload tokens use the callback UUID as sub; fall back to the
# Callback → dag_id → Team path for deadline callback subprocesses.
return await session.scalar(_team_name_for_callback_stmt(token.id))
return await session.scalar(_team_name_for_ti_stmt(token.id))


def get_team_name_for_ti(ti_id, session) -> str | None:
Expand Down Expand Up @@ -294,21 +289,3 @@ def _team_name_for_dag_stmt(dag_id):
.join(DagBundleModel.teams)
.where(DagModel.dag_id == dag_id)
)


def _team_name_for_callback_stmt(callback_id):
"""Build the select statement resolving ``Callback.id -> dag_id -> Team.name``."""
from airflow.models import DagModel
from airflow.models.callback import Callback
from airflow.models.dagbundle import DagBundleModel
from airflow.models.team import Team

# Callbacks store dag_id as a JSON key in data; join via the dag_id value.
return (
select(Team.name)
.select_from(Callback)
.join(DagModel, DagModel.dag_id == Callback.data["dag_id"].as_string())
.join(DagBundleModel, DagBundleModel.name == DagModel.bundle_name)
.join(DagBundleModel.teams)
.where(Callback.id == callback_id)
)
Loading
Loading