From c5f8d038cd6fcc31cf7196d1814d91710cca5f16 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Tue, 22 Apr 2025 12:06:09 +0100 Subject: [PATCH] LatestOnly operator should not compare to `now`, but to the DagRun dates Comparing to now means that if you clear a dagrun and it re-runs it will always show up as latest! Additionally remove the check/don't skip logic whe the data interval is zero-wdith. Even if a DAG doesn't have the concept of a data-interval (i.e. it is zero width) it still is logically consistent for it to have to concept of latest or not, so we now only compare against the end date of the interval. (And a few drive-by refactors too, `context["task"]` is `self`, `context["dag"]` is `self.dag`) --- .../standard/operators/latest_only.py | 47 ++++++------------- 1 file changed, 15 insertions(+), 32 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/latest_only.py b/providers/standard/src/airflow/providers/standard/operators/latest_only.py index fd5cf62bbad2f..834f0c4de3063 100644 --- a/providers/standard/src/airflow/providers/standard/operators/latest_only.py +++ b/providers/standard/src/airflow/providers/standard/operators/latest_only.py @@ -22,14 +22,12 @@ from collections.abc import Iterable from typing import TYPE_CHECKING -import pendulum - from airflow.providers.standard.operators.branch import BaseBranchOperator from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.types import DagRunType if TYPE_CHECKING: - from airflow.models import DAG, DagRun + from airflow.models import DagRun from airflow.timetables.base import DagRunInfo try: @@ -62,54 +60,39 @@ def choose_branch(self, context: Context) -> str | Iterable[str]: dag_run: DagRun = context["dag_run"] # type: ignore[assignment] if dag_run.run_type == DagRunType.MANUAL: self.log.info("Manually triggered DAG_Run: allowing execution to proceed.") - return list(context["task"].get_direct_relative_ids(upstream=False)) + return list(self.get_direct_relative_ids(upstream=False)) - next_info = self._get_next_run_info(context, dag_run) - now = pendulum.now("UTC") + next_info = self._get_next_run_info(dag_run) if next_info is None: self.log.info("Last scheduled execution: allowing execution to proceed.") - return list(context["task"].get_direct_relative_ids(upstream=False)) - - left_window, right_window = next_info.data_interval - self.log.info( - "Checking latest only with left_window: %s right_window: %s now: %s", - left_window, - right_window, - now, - ) - - if left_window == right_window: - self.log.info( - "Zero-length interval [%s, %s) from timetable (%s); treating current run as latest.", - left_window, - right_window, - self.dag.timetable.__class__, - ) - return list(context["task"].get_direct_relative_ids(upstream=False)) + return list(self.get_direct_relative_ids(upstream=False)) - if not left_window < now <= right_window: + dag_run_date = dag_run.data_interval_end or dag_run.run_after + if dag_run_date < next_info.data_interval.end: self.log.info("Not latest execution, skipping downstream.") # we return an empty list, thus the parent BaseBranchOperator # won't exclude any downstream tasks from skipping. return [] - self.log.info("Latest, allowing execution to proceed.") - return list(context["task"].get_direct_relative_ids(upstream=False)) - def _get_next_run_info(self, context: Context, dag_run: DagRun) -> DagRunInfo | None: - dag: DAG = context["dag"] # type: ignore[assignment] + self.log.info("Latest, allowing execution to proceed.") + return list(self.get_direct_relative_ids(upstream=False)) + def _get_next_run_info(self, dag_run: DagRun) -> DagRunInfo | None: if AIRFLOW_V_3_0_PLUS: from airflow.timetables.base import DataInterval, TimeRestriction time_restriction = TimeRestriction(earliest=None, latest=None, catchup=True) - current_interval = DataInterval(start=dag_run.data_interval_start, end=dag_run.data_interval_end) + current_interval = DataInterval( + start=dag_run.data_interval_start or dag_run.run_after, + end=dag_run.data_interval_end or dag_run.run_after, + ) - next_info = dag.timetable.next_dagrun_info( + next_info = self.dag.timetable.next_dagrun_info( last_automated_data_interval=current_interval, restriction=time_restriction, ) else: - next_info = dag.next_dagrun_info(dag.get_run_data_interval(dag_run), restricted=False) + next_info = self.dag.next_dagrun_info(self.dag.get_run_data_interval(dag_run), restricted=False) return next_info