Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68917.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Deadline alerts using a ``VariableInterval`` no longer risk aborting DagRun creation. The interval is now resolved through the full secrets chain (environment variables, configured secrets backends, then the metadata database) on the scheduler's own session, so ``AIRFLOW_VAR_*`` and secrets-backend-backed Variables resolve correctly and the read does not commit inside the scheduler's ``prohibit_commit`` guard. Each deadline alert is also isolated: a single unresolvable or undecodable alert is logged and skipped instead of preventing the DagRun from being created.
154 changes: 114 additions & 40 deletions airflow-core/src/airflow/serialization/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,51 +715,125 @@ def _process_dagrun_deadline_alerts(
if not deadline_alert:
continue

deserialized_deadline_alert = decode_deadline_alert(
{
Encoding.TYPE: DAT.DEADLINE_ALERT,
Encoding.VAR: {
DeadlineAlertFields.REFERENCE: deadline_alert.reference,
DeadlineAlertFields.INTERVAL: deadline_alert.interval,
DeadlineAlertFields.CALLBACK: deadline_alert.callback_def,
},
}
)
# Isolate each deadline alert: creating a deadline is auxiliary to creating the
# DagRun itself, and must never prevent the DagRun from being created. A single bad
# alert -- e.g. a ``VariableInterval`` whose backing Variable is missing / non-integer
# / <= 0 (``coerce_to_timedelta`` raises ``ValueError``), or a reference whose
# ``evaluate_with`` fails -- would otherwise propagate out of ``create_dagrun`` and
# abort the whole run, silently stopping the DAG from scheduling.
#
# Isolation is done with a plain ``try``/``except`` and MUST NOT use
# ``session.begin_nested()``: ``create_dagrun`` runs on the scheduler session inside
# the ``prohibit_commit`` guard, and releasing a SAVEPOINT issues a commit, which trips
# that guard (``RuntimeError("UNEXPECTED COMMIT ...")``) and -- because this very block
# swallows it -- silently skips deadline creation for *every* scheduled DagRun. The
# try/except alone is sufficient: the only DB mutation in the loop body is the final
# ``session.add`` (everything before it is a decode, an in-memory resolution, or a
# read-only ``evaluate_with`` query), so an exception leaves no partial state to undo,
# and the pending ``Deadline`` is persisted by the caller's outer transaction.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to adjust the AI tool you are using to avoid generating these massive comments and docstrings. I would suggest the below instead:

# Deadline creation is best-effort. A failure here must not prevent the DagRun
# itself from being created. Use a plain try/except rather than
# ``session.begin_nested()`` since ``create_dagrun`` runs under
# ``prohibit_commit`` and releasing a SAVEPOINT would trip that guard.

try:
deserialized_deadline_alert = decode_deadline_alert(
{
Encoding.TYPE: DAT.DEADLINE_ALERT,
Encoding.VAR: {
DeadlineAlertFields.REFERENCE: deadline_alert.reference,
DeadlineAlertFields.INTERVAL: deadline_alert.interval,
DeadlineAlertFields.CALLBACK: deadline_alert.callback_def,
},
}
)

interval = deserialized_deadline_alert.interval
interval = deserialized_deadline_alert.interval

if isinstance(interval, VariableInterval):
interval = interval.resolve()
if isinstance(interval, VariableInterval):
interval = self._resolve_variable_interval(interval, session=session)

if isinstance(deserialized_deadline_alert.reference, SerializedReferenceModels.TYPES.DAGRUN):
deadline_time = deserialized_deadline_alert.reference.evaluate_with(
session=session,
interval=interval,
# TODO : Pretty sure we can drop these last two; verify after testing is complete
dag_id=self.dag_id,
run_id=orm_dagrun.run_id,
)
if isinstance(deserialized_deadline_alert.reference, SerializedReferenceModels.TYPES.DAGRUN):
deadline_time = deserialized_deadline_alert.reference.evaluate_with(
session=session,
interval=interval,
# TODO : Pretty sure we can drop these last two; verify after testing is complete
dag_id=self.dag_id,
run_id=orm_dagrun.run_id,
)

if deadline_time is not None:
session.add(
Deadline(
deadline_time=deadline_time,
callback=deserialized_deadline_alert.callback,
dagrun_id=orm_dagrun.id,
deadline_alert_id=deadline_alert.id,
dag_id=orm_dagrun.dag_id,
bundle_name=orm_dagrun.dag_model.bundle_name,
if deadline_time is not None:
session.add(
Deadline(
deadline_time=deadline_time,
callback=deserialized_deadline_alert.callback,
dagrun_id=orm_dagrun.id,
deadline_alert_id=deadline_alert.id,
dag_id=orm_dagrun.dag_id,
bundle_name=orm_dagrun.dag_model.bundle_name,
)
)
)
team_name = (
DagModel.get_team_name(self.dag_id, session=session)
if airflow_conf.getboolean("core", "multi_team")
else None
)
stats.incr(
"deadline_alerts.deadline_created",
tags=prune_dict({"dag_id": self.dag_id, "team_name": team_name}),
)
team_name = (
DagModel.get_team_name(self.dag_id, session=session)
if airflow_conf.getboolean("core", "multi_team")
else None
)
stats.incr(
"deadline_alerts.deadline_created",
tags=prune_dict({"dag_id": self.dag_id, "team_name": team_name}),
)
except Exception:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This except is too broad. If the only errors that you expect to encounter here are likely to do with _resolve_variable_interval or deserialization, why not limit it to them? Maybe something like this would be better:

except (ValueError, DeserializationError):

log.exception(
"Failed to create deadline for alert %s on DagRun %s (dag_id=%s); "
"skipping this deadline, the DagRun is unaffected",
getattr(deadline_alert, "id", "<unknown>"),
orm_dagrun.run_id,
self.dag_id,
)
stats.incr("deadline_alerts.deadline_creation_failed", tags={"dag_id": self.dag_id})

@staticmethod
def _resolve_variable_interval(interval: VariableInterval, *, session: Session) -> datetime.timedelta:
"""
Resolve a ``VariableInterval`` to a concrete ``timedelta`` at DagRun creation time.

The backing Variable is read through the full secrets chain (env vars, configured
secrets backends, then the metadata DB) -- the same resolution order as
``Variable.get`` -- rather than reading only the ``variable`` table. Reading the table
directly would bypass ``AIRFLOW_VAR_*`` env vars and secrets backends, so a Variable
that lives there would resolve to ``None`` and the deadline would be silently dropped.

We must NOT call ``Variable.get`` / ``get_variable_from_secrets`` here: the metastore
backend's ``get_variable`` is ``@provide_session`` and, given no session, opens the
thread-local scoped session (the SAME one the scheduler holds) whose context-manager
exit COMMITS -- tripping the ``prohibit_commit`` guard ``create_dagrun`` runs under.
Instead we iterate the backends ourselves and pass the scheduler's ``session`` through
to the metastore backend (env / custom backends ignore it), so the DB read happens on
our session with no commit.

:param interval: The ``VariableInterval`` to resolve.
:param session: The scheduler session, passed through to the metastore backend.
:return: The resolved ``timedelta``.
:raises ValueError: If the Variable cannot be found in any backend, or its value
cannot be coerced to a positive ``timedelta``.
"""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as above. I would suggest the below:

"""
Resolve a VariableInterval to a concrete timedelta at DagRun creation.

The Variable is resolved using the standard secrets lookup order. The scheduler
session is passed to the metastore backend to avoid creating a new session
during DagRun creation.

:param interval: The VariableInterval to resolve.
:param session: Scheduler session used for metadata database lookups.
:return: The resolved timedelta.
:raises ValueError: If the Variable cannot be resolved or converted to a valid
timedelta.
"""

from airflow._shared.secrets_backend.base import call_secrets_backend_method
from airflow.configuration import ensure_secrets_loaded
from airflow.secrets.metastore import MetastoreBackend

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these imports need to be here to avoid circular imports or something like that? If not, I would move them to the top.


var_val = None
for secrets_backend in ensure_secrets_loaded():
kwargs = {"session": session} if isinstance(secrets_backend, MetastoreBackend) else {}
var_val = call_secrets_backend_method(
secrets_backend.get_variable, team_name=None, key=interval.key, **kwargs
)
if var_val is not None:
break

if var_val is None:
# Fail loudly (the per-alert ``except`` isolates it): the Variable does not exist in
# any backend, so we cannot resolve the interval. This is not a silent skip.
raise ValueError(
f"VariableInterval '{interval.key}' could not be resolved from any "
f"secrets backend, environment variable, or the metadata database"
)

return interval.coerce_to_timedelta(var_val)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be much cleaner. Please see the below:

for backend in ensure_secrets_loaded():
    value = call_secrets_backend_method(
        backend.get_variable,
        team_name=None,
        key=interval.key,
        **({"session": session} if isinstance(backend, MetastoreBackend) else {}),
    )
    if value is not None:
        return interval.coerce_to_timedelta(value)

    # Fail loudly (the per-alert ``except`` isolates it): the Variable does not exist in
    # any backend, so we cannot resolve the interval. This is not a silent skip.
    raise ValueError(
            f"VariableInterval '{interval.key}' could not be resolved from any "
            f"secrets backend, environment variable, or the metadata database"
        )


@provide_session
def set_task_instance_state(
Expand Down
Loading
Loading