From b293b7841cadbde5c413e39df928ce54b07da590 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 12 Feb 2025 11:50:38 +0800 Subject: [PATCH 1/2] Remove 'allow_trigger_in_future' config A DAG run with logical date in the future can never be started now. This only affects schedule=None, which can only be triggered manually. Instead of using a future date, you can trigger with a None logical date whenever you want. A custom run_id can be supplied if you want it. If a date is needed, it can be passed as a DAG param instead. --- airflow/config_templates/config.yml | 8 ---- airflow/jobs/scheduler_job_runner.py | 2 +- airflow/models/dag.py | 5 +-- airflow/models/dagrun.py | 7 +--- airflow/settings.py | 2 - .../ti_deps/deps/runnable_exec_date_dep.py | 11 +++--- .../scheduler.rst | 9 ----- task_sdk/src/airflow/sdk/definitions/dag.py | 4 -- .../deps/test_runnable_exec_date_dep.py | 38 ++++++++----------- 9 files changed, 25 insertions(+), 61 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 201d4e3e6bc83..fc5546759ba25 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2398,14 +2398,6 @@ scheduler: type: boolean example: ~ default: "True" - allow_trigger_in_future: - description: | - Allow externally triggered DagRuns for Execution Dates in the future - Only has effect if schedule is set to None in DAG - version_added: 1.10.8 - type: boolean - example: ~ - default: "False" trigger_timeout_check_interval: description: | How often to check for expired trigger requests that have not run yet. diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index f340893e5aef7..3703268b07ec5 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1656,7 +1656,7 @@ def _schedule_dag_run( ) return callback_to_execute - if dag_run.logical_date > timezone.utcnow() and not dag.allow_future_exec_dates: + if dag_run.logical_date and dag_run.logical_date > timezone.utcnow(): self.log.error("Logical date is in future: %s", dag_run.logical_date) return callback diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 111852f2728ee..8ae5935bad3c4 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1054,10 +1054,7 @@ def _get_task_instances( tis = tis.where(DagRun.logical_date >= start_date) if task_ids is not None: tis = tis.where(TaskInstance.ti_selector_condition(task_ids)) - - # This allows allow_trigger_in_future config to take affect, rather than mandating exec_date <= UTC - if end_date or not self.allow_future_exec_dates: - end_date = end_date or timezone.utcnow() + if end_date: tis = tis.where(DagRun.logical_date <= end_date) if state: diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 199762d69ef7c..808543d0b50f9 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -53,7 +53,6 @@ from sqlalchemy.sql.functions import coalesce from sqlalchemy_utils import UUIDType -from airflow import settings from airflow.callbacks.callback_requests import DagCallbackRequest from airflow.configuration import conf as airflow_conf from airflow.exceptions import AirflowException, TaskNotFound @@ -456,8 +455,7 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> Query: .limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE) ) - if not settings.ALLOW_FUTURE_LOGICAL_DATES: - query = query.where(DagRun.logical_date <= func.now()) + query = query.where(DagRun.run_after <= func.now()) return session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)) @@ -542,8 +540,7 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> Query: .limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE) ) - if not settings.ALLOW_FUTURE_LOGICAL_DATES: - query = query.where(DagRun.logical_date <= func.now()) + query = query.where(DagRun.run_after <= func.now()) return session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)) diff --git a/airflow/settings.py b/airflow/settings.py index d8e796db7ce7e..82168685aa7e0 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -677,8 +677,6 @@ def initialize(): fallback=False, ) -ALLOW_FUTURE_LOGICAL_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False) - USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True) # By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False, diff --git a/airflow/ti_deps/deps/runnable_exec_date_dep.py b/airflow/ti_deps/deps/runnable_exec_date_dep.py index 990264563a2fc..0f996e5e9f409 100644 --- a/airflow/ti_deps/deps/runnable_exec_date_dep.py +++ b/airflow/ti_deps/deps/runnable_exec_date_dep.py @@ -25,17 +25,18 @@ class RunnableExecDateDep(BaseTIDep): """Determines whether a task's logical date is valid.""" - NAME = "Execution Date" + NAME = "Logical Date" IGNORABLE = True @provide_session def _get_dep_statuses(self, ti, session, dep_context): + logical_date = ti.get_dagrun(session).logical_date + if logical_date is None: + return + cur_date = timezone.utcnow() - # don't consider runs that are executed in the future unless - # specified by config and schedule is None - logical_date = ti.get_dagrun(session).logical_date - if logical_date > cur_date and not ti.task.dag.allow_future_exec_dates: + if logical_date > cur_date: yield self._failing_status( reason=( f"Logical date {logical_date.isoformat()} is in the future " diff --git a/docs/apache-airflow/administration-and-deployment/scheduler.rst b/docs/apache-airflow/administration-and-deployment/scheduler.rst index be0bd24b63797..a6416d774c1d7 100644 --- a/docs/apache-airflow/administration-and-deployment/scheduler.rst +++ b/docs/apache-airflow/administration-and-deployment/scheduler.rst @@ -66,15 +66,6 @@ In the UI, it appears as if Airflow is running your tasks a day **late** waiting than the queue slots. Thus there can be cases where low priority tasks will be scheduled before high priority tasks if they share the same batch. For more read about that you can reference `this GitHub discussion `__. - -Triggering DAG with Future Date -------------------------------- - -If you want to use 'external trigger' to run future-dated data intervals, set ``allow_trigger_in_future = True`` in ``scheduler`` section in ``airflow.cfg``. -This only has effect if your DAG is defined with ``schedule=None``. -When set to ``False`` (the default value), if you manually trigger a run with future-dated data intervals, -the scheduler will not execute it until its ``data_interval_start`` is in the past. - .. _scheduler:ha: Running More Than One Scheduler diff --git a/task_sdk/src/airflow/sdk/definitions/dag.py b/task_sdk/src/airflow/sdk/definitions/dag.py index 3c8d99737ab8e..ce37ec62ed8eb 100644 --- a/task_sdk/src/airflow/sdk/definitions/dag.py +++ b/task_sdk/src/airflow/sdk/definitions/dag.py @@ -651,10 +651,6 @@ def owner(self) -> str: """ return ", ".join({t.owner for t in self.tasks}) - @property - def allow_future_exec_dates(self) -> bool: - return settings.ALLOW_FUTURE_LOGICAL_DATES and not self.timetable.can_be_scheduled - def resolve_template_files(self): for t in self.tasks: # TODO: TaskSDK: move this on to BaseOperator and remove the check? diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py b/tests/ti_deps/deps/test_runnable_exec_date_dep.py index b24c22cbbe930..94b9cfc2ac6cc 100644 --- a/tests/ti_deps/deps/test_runnable_exec_date_dep.py +++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py @@ -17,12 +17,11 @@ # under the License. from __future__ import annotations -from unittest.mock import Mock, patch +from unittest.mock import Mock import pytest import time_machine -from airflow import settings from airflow.models import DagRun, TaskInstance from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep from airflow.utils.timezone import datetime @@ -40,39 +39,32 @@ def clean_db(session): @time_machine.travel("2016-11-01") @pytest.mark.parametrize( - "allow_trigger_in_future,schedule,logical_date,is_met", + "logical_date, is_met", [ - (True, "@daily", datetime(2016, 11, 3), False), - (False, None, datetime(2016, 11, 3), False), - (False, "@daily", datetime(2016, 11, 3), False), - (False, "@daily", datetime(2016, 11, 1), True), - (False, None, datetime(2016, 11, 1), True), + (datetime(2016, 11, 3), False), + (datetime(2016, 11, 1), True), ], ) def test_logical_date_dep( dag_maker, session, create_dummy_dag, - allow_trigger_in_future, - schedule, logical_date, is_met, ): """ - If the dag's logical date is in the future but (allow_trigger_in_future=False or not schedule) - this dep should fail + If the dag's logical date is in the future, this dep should fail """ - with patch.object(settings, "ALLOW_FUTURE_LOGICAL_DATES", allow_trigger_in_future): - create_dummy_dag( - "test_localtaskjob_heartbeat", - start_date=datetime(2015, 1, 1), - end_date=datetime(2016, 11, 5), - schedule=schedule, - with_dagrun_type=DagRunType.MANUAL, - session=session, - ) - (ti,) = dag_maker.create_dagrun(run_id="scheduled", logical_date=logical_date).task_instances - assert RunnableExecDateDep().is_met(ti=ti) == is_met + create_dummy_dag( + "test_localtaskjob_heartbeat", + start_date=datetime(2015, 1, 1), + end_date=datetime(2016, 11, 5), + schedule=None, + with_dagrun_type=DagRunType.MANUAL, + session=session, + ) + (ti,) = dag_maker.create_dagrun(run_id="scheduled", logical_date=logical_date).task_instances + assert RunnableExecDateDep().is_met(ti=ti) == is_met @time_machine.travel("2016-01-01") From 3ac99454819b1f77d2ff245fd53b73711bf031f3 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 12 Feb 2025 12:23:04 +0800 Subject: [PATCH 2/2] Add news fragment --- newsfragments/46663.significant.rst | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 newsfragments/46663.significant.rst diff --git a/newsfragments/46663.significant.rst b/newsfragments/46663.significant.rst new file mode 100644 index 0000000000000..c0eeb797c7e2e --- /dev/null +++ b/newsfragments/46663.significant.rst @@ -0,0 +1,31 @@ +Removed configuration ``scheduler.allow_trigger_in_future``. + +A DAG run with logical date in the future can never be started now. This only affects ``schedule=None``. + +Instead of using a future date, you can trigger with ``logical_date=None``. A custom ``run_id`` can be supplied if desired. If a date is needed, it can be passed as a DAG param instead. + +Property ``allow_future_exec_dates`` on the DAG class has also been removed. + + +* Types of change + + * [ ] Dag changes + * [x] Config changes + * [ ] API changes + * [ ] CLI changes + * [ ] Behaviour changes + * [ ] Plugin changes + * [ ] Dependency changes + * [x] Code interface changes + +* Migration rules needed + + * ruff + + * AIR302 + + * [ ] property ``airflow.models.dag.DAG.allow_future_exec_dates`` + + * ``airflow config lint`` + + * [ ] ``scheduler.allow_trigger_in_future``