diff --git a/providers/standard/src/airflow/providers/standard/operators/latest_only.py b/providers/standard/src/airflow/providers/standard/operators/latest_only.py index fd5cf62bbad2f..834f0c4de3063 100644 --- a/providers/standard/src/airflow/providers/standard/operators/latest_only.py +++ b/providers/standard/src/airflow/providers/standard/operators/latest_only.py @@ -22,14 +22,12 @@ from collections.abc import Iterable from typing import TYPE_CHECKING -import pendulum - from airflow.providers.standard.operators.branch import BaseBranchOperator from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.types import DagRunType if TYPE_CHECKING: - from airflow.models import DAG, DagRun + from airflow.models import DagRun from airflow.timetables.base import DagRunInfo try: @@ -62,54 +60,39 @@ def choose_branch(self, context: Context) -> str | Iterable[str]: dag_run: DagRun = context["dag_run"] # type: ignore[assignment] if dag_run.run_type == DagRunType.MANUAL: self.log.info("Manually triggered DAG_Run: allowing execution to proceed.") - return list(context["task"].get_direct_relative_ids(upstream=False)) + return list(self.get_direct_relative_ids(upstream=False)) - next_info = self._get_next_run_info(context, dag_run) - now = pendulum.now("UTC") + next_info = self._get_next_run_info(dag_run) if next_info is None: self.log.info("Last scheduled execution: allowing execution to proceed.") - return list(context["task"].get_direct_relative_ids(upstream=False)) - - left_window, right_window = next_info.data_interval - self.log.info( - "Checking latest only with left_window: %s right_window: %s now: %s", - left_window, - right_window, - now, - ) - - if left_window == right_window: - self.log.info( - "Zero-length interval [%s, %s) from timetable (%s); treating current run as latest.", - left_window, - right_window, - self.dag.timetable.__class__, - ) - return list(context["task"].get_direct_relative_ids(upstream=False)) + return list(self.get_direct_relative_ids(upstream=False)) - if not left_window < now <= right_window: + dag_run_date = dag_run.data_interval_end or dag_run.run_after + if dag_run_date < next_info.data_interval.end: self.log.info("Not latest execution, skipping downstream.") # we return an empty list, thus the parent BaseBranchOperator # won't exclude any downstream tasks from skipping. return [] - self.log.info("Latest, allowing execution to proceed.") - return list(context["task"].get_direct_relative_ids(upstream=False)) - def _get_next_run_info(self, context: Context, dag_run: DagRun) -> DagRunInfo | None: - dag: DAG = context["dag"] # type: ignore[assignment] + self.log.info("Latest, allowing execution to proceed.") + return list(self.get_direct_relative_ids(upstream=False)) + def _get_next_run_info(self, dag_run: DagRun) -> DagRunInfo | None: if AIRFLOW_V_3_0_PLUS: from airflow.timetables.base import DataInterval, TimeRestriction time_restriction = TimeRestriction(earliest=None, latest=None, catchup=True) - current_interval = DataInterval(start=dag_run.data_interval_start, end=dag_run.data_interval_end) + current_interval = DataInterval( + start=dag_run.data_interval_start or dag_run.run_after, + end=dag_run.data_interval_end or dag_run.run_after, + ) - next_info = dag.timetable.next_dagrun_info( + next_info = self.dag.timetable.next_dagrun_info( last_automated_data_interval=current_interval, restriction=time_restriction, ) else: - next_info = dag.next_dagrun_info(dag.get_run_data_interval(dag_run), restricted=False) + next_info = self.dag.next_dagrun_info(self.dag.get_run_data_interval(dag_run), restricted=False) return next_info