Skip to content

Resolve VariableInterval deadlines safely at DagRun creation#68917

Open
seanghaeli wants to merge 2 commits into
apache:mainfrom
aws-mwaa:feature/variable-interval-resolution
Open

Resolve VariableInterval deadlines safely at DagRun creation#68917
seanghaeli wants to merge 2 commits into
apache:mainfrom
aws-mwaa:feature/variable-interval-resolution

Conversation

@seanghaeli

@seanghaeli seanghaeli commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Why

This re-introduces the VariableInterval resolution portion of #66608.

A DeadlineAlert configured with a VariableInterval is resolved when the scheduler creates a DagRun, inside DAG._process_dagrun_deadline_alerts (which runs under the prohibit_commit guard). Two problems are fixed:

  1. Resolution goes through the full secrets chain (env vars, configured secrets backends, then the metadata DB) via a dedicated _resolve_variable_interval helper, rather than Variable.get / begin_nested. Variable.get and a SAVEPOINT release both commit on the scheduler's session, tripping prohibit_commit (UNEXPECTED COMMIT) and silently dropping deadlines for every scheduled DagRun. The helper passes the scheduler session through to the metastore backend so the DB read does not commit, and reading via the secrets chain (not the variable table directly) means AIRFLOW_VAR_* env vars and secrets-backend-backed Variables resolve too.

  2. Each deadline alert is isolated with a plain try/except (deliberately not begin_nested, which would commit a SAVEPOINT and trip the same guard). Creating a deadline is auxiliary to creating the DagRun; a single bad alert — a missing/invalid backing Variable, or an undecodable serialized blob — must never abort the DagRun and stop the DAG from scheduling.

VariableInterval.resolve is split into resolve + coerce_to_timedelta so the scheduler-side reader reuses the exact same validation (including the OverflowError -> ValueError translation) without going through Variable.get.

Tests

  • airflow-core/tests/unit/models/test_dagrun.py: VariableInterval resolves from a real Variable row and from an AIRFLOW_VAR_* env var; a missing Variable and an undecodable alert are isolated (DagRun still created, no Deadline row, error logged).
  • task-sdk/tests/task_sdk/definitions/test_deadline.py: coerce_to_timedelta validation (non-integer, <= 0, overflow).

Verified locally in Breeze: 8 passed (dagrun deadline/variable) + 14 passed (SDK TestVariableInterval).

Generated-by: Claude Code (Opus via Claude Code) on behalf of Sean Ghaeli

@seanghaeli seanghaeli force-pushed the feature/variable-interval-resolution branch 2 times, most recently from 7aee60c to 94825ec Compare June 23, 2026 20:54
@seanghaeli seanghaeli requested a review from potiuk as a code owner June 23, 2026 20:54
@seanghaeli seanghaeli marked this pull request as draft June 23, 2026 20:57
A ``DeadlineAlert`` configured with a ``VariableInterval`` is resolved when the
scheduler creates a DagRun, inside ``DAG._process_dagrun_deadline_alerts`` (which
runs under the ``prohibit_commit`` guard). Two problems are fixed:

1. Resolution now goes through the full secrets chain (env vars, configured
   secrets backends, then the metadata DB) via a dedicated
   ``_resolve_variable_interval`` helper, rather than ``Variable.get`` /
   ``begin_nested``. ``Variable.get`` and a SAVEPOINT release both commit on the
   scheduler's session, tripping ``prohibit_commit`` ("UNEXPECTED COMMIT") and
   silently dropping deadlines for every scheduled DagRun. The helper passes the
   scheduler session through to the metastore backend so the DB read happens
   without committing, and reading via the secrets chain (not the variable table
   directly) means ``AIRFLOW_VAR_*`` env vars and secrets backends resolve too.

2. Each deadline alert is isolated with a plain ``try``/``except`` (NOT
   ``begin_nested``). Creating a deadline is auxiliary to creating the DagRun; a
   single bad alert -- a missing/invalid backing Variable, or an undecodable
   serialized blob -- must never abort the DagRun and stop the DAG scheduling.

``VariableInterval.resolve`` is split into ``resolve`` + ``coerce_to_timedelta``
so the scheduler-side reader reuses the exact same validation (including the
OverflowError -> ValueError translation) without going through ``Variable.get``.

Generated-by: Claude Code (Sonnet/Opus via Claude Code) on behalf of Sean Ghaeli
@seanghaeli seanghaeli force-pushed the feature/variable-interval-resolution branch from 94825ec to 2342c03 Compare June 23, 2026 21:31
@seanghaeli seanghaeli marked this pull request as ready for review June 23, 2026 23:00
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Jun 25, 2026

@SameerMesiah97 SameerMesiah97 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not manage to get to the tests but if I have to be honest, I think this needs a round of polish before a maintainer can review it. Conceptually, it looks correct but there are classic smells of unvetted AI-generated content such as messy code, long commnets/docstrings etc. I would convert it to draft and clean it up before requesting review.

I have left some comments.

"deadline_alerts.deadline_created",
tags=prune_dict({"dag_id": self.dag_id, "team_name": team_name}),
)
except Exception:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This except is too broad. If the only errors that you expect to encounter here are likely to do with _resolve_variable_interval or deserialization, why not limit it to them? Maybe something like this would be better:

except (ValueError, DeserializationError):

# try/except alone is sufficient: the only DB mutation in the loop body is the final
# ``session.add`` (everything before it is a decode, an in-memory resolution, or a
# read-only ``evaluate_with`` query), so an exception leaves no partial state to undo,
# and the pending ``Deadline`` is persisted by the caller's outer transaction.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to adjust the AI tool you are using to avoid generating these massive comments and docstrings. I would suggest the below instead:

# Deadline creation is best-effort. A failure here must not prevent the DagRun
# itself from being created. Use a plain try/except rather than
# ``session.begin_nested()`` since ``create_dagrun`` runs under
# ``prohibit_commit`` and releasing a SAVEPOINT would trip that guard.

:return: The resolved ``timedelta``.
:raises ValueError: If the Variable cannot be found in any backend, or its value
cannot be coerced to a positive ``timedelta``.
"""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as above. I would suggest the below:

"""
Resolve a VariableInterval to a concrete timedelta at DagRun creation.

The Variable is resolved using the standard secrets lookup order. The scheduler
session is passed to the metastore backend to avoid creating a new session
during DagRun creation.

:param interval: The VariableInterval to resolve.
:param session: Scheduler session used for metadata database lookups.
:return: The resolved timedelta.
:raises ValueError: If the Variable cannot be resolved or converted to a valid
timedelta.
"""

"""
from airflow._shared.secrets_backend.base import call_secrets_backend_method
from airflow.configuration import ensure_secrets_loaded
from airflow.secrets.metastore import MetastoreBackend

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these imports need to be here to avoid circular imports or something like that? If not, I would move them to the top.

f"secrets backend, environment variable, or the metadata database"
)

return interval.coerce_to_timedelta(var_val)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be much cleaner. Please see the below:

for backend in ensure_secrets_loaded():
    value = call_secrets_backend_method(
        backend.get_variable,
        team_name=None,
        key=interval.key,
        **({"session": session} if isinstance(backend, MetastoreBackend) else {}),
    )
    if value is not None:
        return interval.coerce_to_timedelta(value)

    # Fail loudly (the per-alert ``except`` isolates it): the Variable does not exist in
    # any backend, so we cannot resolve the interval. This is not a silent skip.
    raise ValueError(
            f"VariableInterval '{interval.key}' could not be resolved from any "
            f"secrets backend, environment variable, or the metadata database"
        )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:DAG-processing area:deadline-alerts AIP-86 (former AIP-57) area:task-sdk ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants