From 95601fddcbf3e004081e0d39297a93b8da861a41 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Fri, 1 Sep 2023 14:33:27 +0200 Subject: [PATCH 1/8] Fix manual task triggering scheduled tasks Fixes #33949 --- airflow/jobs/scheduler_job_runner.py | 25 ++++++++++-------- airflow/models/dag.py | 38 ++++++++++++++-------------- tests/jobs/test_scheduler_job.py | 3 ++- 3 files changed, 36 insertions(+), 30 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 52f6fb737a442..80b0dd342b4c4 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1192,7 +1192,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - ) active_runs_of_dags[dag.dag_id] += 1 if self._should_update_dag_next_dagruns( - dag, dag_model, active_runs_of_dags[dag.dag_id], session=session + dag, dag_model, None, active_runs_of_dags[dag.dag_id], session=session ): dag_model.calculate_dagrun_date_fields(dag, data_interval) # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in @@ -1300,9 +1300,16 @@ def _create_dag_runs_dataset_triggered( ) def _should_update_dag_next_dagruns( - self, dag: DAG, dag_model: DagModel, total_active_runs: int | None = None, *, session: Session + self, dag: DAG, dag_model: DagModel, last_dag_run: DagRun | None = None, + total_active_runs: int | None = None, *, session: Session ) -> bool: - """Check if the dag's next_dagruns_create_after should be updated.""" + """Check if the dag's next_dagruns_create_after should be updated. + If last_dag_run is defined, the update was triggered by a scheduling decision in this DAG run + """ + # If last_dag_run is defined, schedule next only if last_dag_run is finished and was an automated run + if last_dag_run and not (last_dag_run.state in State.finished_dr_states and + last_dag_run.run_type in [DagRunType.SCHEDULED, DagRunType.BACKFILL_JOB]): + return False # If the DAG never schedules skip save runtime if not dag.timetable.can_be_scheduled: return False @@ -1437,8 +1444,8 @@ def _schedule_dag_run( session.merge(task_instance) session.flush() self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) - # Work out if we should allow creating a new DagRun now? - if self._should_update_dag_next_dagruns(dag, dag_model, session=session): + + if self._should_update_dag_next_dagruns(dag, dag_model, dag_run, session=session): dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) callback_to_execute = DagCallbackRequest( @@ -1465,11 +1472,9 @@ def _schedule_dag_run( return callback # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) - # Check if DAG not scheduled then skip interval calculation to same scheduler runtime - if dag_run.state in State.finished_dr_states: - # Work out if we should allow creating a new DagRun now? - if self._should_update_dag_next_dagruns(dag, dag_model, session=session): - dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) + + if self._should_update_dag_next_dagruns(dag, dag_model, dag_run, session=session): + dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) # This will do one query per dag run. We "could" build up a complex # query to update all the TIs across all the execution dates and dag # IDs in a single query, but it turns out that can be _very very slow_ diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 4113ca6b24c64..967ff24966395 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2931,12 +2931,12 @@ def bulk_write_to_db( session.add(orm_dag) orm_dags.append(orm_dag) - most_recent_runs: dict[str, DagRun] = {} + dag_id_to_last_automated_run: dict[str, DagRun] = {} num_active_runs: dict[str, int] = {} # Skip these queries entirely if no DAGs can be scheduled to save time. if any(dag.timetable.can_be_scheduled for dag in dags): - # Get the latest dag run for each existing dag as a single query (avoid n+1 query) - most_recent_subq = ( + # Get the latest automated dag run for each existing dag as a single query (avoid n+1 query) + last_automated_runs_subq = ( select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) .where( DagRun.dag_id.in_(existing_dags), @@ -2945,13 +2945,13 @@ def bulk_write_to_db( .group_by(DagRun.dag_id) .subquery() ) - most_recent_runs_iter = session.scalars( + last_automated_runs = session.scalars( select(DagRun).where( - DagRun.dag_id == most_recent_subq.c.dag_id, - DagRun.execution_date == most_recent_subq.c.max_execution_date, + DagRun.dag_id == last_automated_runs_subq.c.dag_id, + DagRun.execution_date == last_automated_runs_subq.c.max_execution_date, ) ) - most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter} + dag_id_to_last_automated_run = {run.dag_id: run for run in last_automated_runs} # Get number of active dagruns for all dags we are processing as a single query. num_active_runs = DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session) @@ -2985,15 +2985,15 @@ def bulk_write_to_db( orm_dag.timetable_description = dag.timetable.description orm_dag.processor_subdir = processor_subdir - run: DagRun | None = most_recent_runs.get(dag.dag_id) - if run is None: - data_interval = None + last_automated_run: DagRun | None = dag_id_to_last_automated_run.get(dag.dag_id) + if last_automated_run is None: + last_automated_data_interval = None else: - data_interval = dag.get_run_data_interval(run) + last_automated_data_interval = dag.get_run_data_interval(last_automated_run) if num_active_runs.get(dag.dag_id, 0) >= orm_dag.max_active_runs: orm_dag.next_dagrun_create_after = None else: - orm_dag.calculate_dagrun_date_fields(dag, data_interval) + orm_dag.calculate_dagrun_date_fields(dag, last_automated_data_interval) dag_tags = set(dag.tags or {}) orm_dag_tags = list(orm_dag.tags or []) @@ -3659,27 +3659,27 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[ def calculate_dagrun_date_fields( self, dag: DAG, - most_recent_dag_run: None | datetime | DataInterval, + last_automated_dag_run: None | datetime | DataInterval, ) -> None: """ Calculate ``next_dagrun`` and `next_dagrun_create_after``. :param dag: The DAG object - :param most_recent_dag_run: DataInterval (or datetime) of most recent run of this dag, or none + :param last_automated_dag_run: DataInterval (or datetime) of most recent run of this dag, or none if not yet scheduled. """ - most_recent_data_interval: DataInterval | None - if isinstance(most_recent_dag_run, datetime): + last_automated_data_interval: DataInterval | None + if isinstance(last_automated_dag_run, datetime): warnings.warn( "Passing a datetime to `DagModel.calculate_dagrun_date_fields` is deprecated. " "Provide a data interval instead.", RemovedInAirflow3Warning, stacklevel=2, ) - most_recent_data_interval = dag.infer_automated_data_interval(most_recent_dag_run) + last_automated_data_interval = dag.infer_automated_data_interval(last_automated_dag_run) else: - most_recent_data_interval = most_recent_dag_run - next_dagrun_info = dag.next_dagrun_info(most_recent_data_interval) + last_automated_data_interval = last_automated_dag_run + next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval) if next_dagrun_info is None: self.next_dagrun_data_interval = self.next_dagrun = self.next_dagrun_create_after = None else: diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 94cf3b6728985..012bd6b7fdd71 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1759,6 +1759,7 @@ def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, ses dag_run = dag_maker.create_dagrun( state=State.RUNNING, session=session, + run_type=DagRunType.SCHEDULED ) # Reach max_active_runs @@ -3458,7 +3459,7 @@ def test_should_update_dag_next_dagruns(self, schedule, number_running, excepted self.job_runner = SchedulerJobRunner(job=scheduler_job) assert excepted is self.job_runner._should_update_dag_next_dagruns( - dag, dag_model, number_running, session=session + dag, dag_model, None, number_running, session=session ) def test_create_dag_runs(self, dag_maker): From 20ffaeeeb2751d274fc71e75ad29e796d26c6702 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Fri, 1 Sep 2023 21:12:40 +0200 Subject: [PATCH 2/8] fix static checks --- airflow/jobs/scheduler_job_runner.py | 17 ++++++++++++----- tests/jobs/test_scheduler_job.py | 6 +----- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 80b0dd342b4c4..abb423e621f49 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1300,15 +1300,22 @@ def _create_dag_runs_dataset_triggered( ) def _should_update_dag_next_dagruns( - self, dag: DAG, dag_model: DagModel, last_dag_run: DagRun | None = None, - total_active_runs: int | None = None, *, session: Session + self, + dag: DAG, + dag_model: DagModel, + last_dag_run: DagRun | None = None, + total_active_runs: int | None = None, + *, + session: Session, ) -> bool: """Check if the dag's next_dagruns_create_after should be updated. - If last_dag_run is defined, the update was triggered by a scheduling decision in this DAG run + If last_dag_run is defined, the update was triggered by a scheduling decision in this DAG run. """ # If last_dag_run is defined, schedule next only if last_dag_run is finished and was an automated run - if last_dag_run and not (last_dag_run.state in State.finished_dr_states and - last_dag_run.run_type in [DagRunType.SCHEDULED, DagRunType.BACKFILL_JOB]): + if last_dag_run and not ( + last_dag_run.state in State.finished_dr_states + and last_dag_run.run_type in [DagRunType.SCHEDULED, DagRunType.BACKFILL_JOB] + ): return False # If the DAG never schedules skip save runtime if not dag.timetable.can_be_scheduled: diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 012bd6b7fdd71..106746cf4df2e 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1756,11 +1756,7 @@ def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, ses # Need to use something that doesn't immediately get marked as success by the scheduler BashOperator(task_id="task", bash_command="true") - dag_run = dag_maker.create_dagrun( - state=State.RUNNING, - session=session, - run_type=DagRunType.SCHEDULED - ) + dag_run = dag_maker.create_dagrun(state=State.RUNNING, session=session, run_type=DagRunType.SCHEDULED) # Reach max_active_runs for _ in range(3): From 3db6ba95ee974ea129dfaf81cef1909a3a5bbd00 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Fri, 1 Sep 2023 21:56:51 +0200 Subject: [PATCH 3/8] static checks --- airflow/jobs/scheduler_job_runner.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index abb423e621f49..27086d2795283 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1308,10 +1308,9 @@ def _should_update_dag_next_dagruns( *, session: Session, ) -> bool: - """Check if the dag's next_dagruns_create_after should be updated. - If last_dag_run is defined, the update was triggered by a scheduling decision in this DAG run. - """ - # If last_dag_run is defined, schedule next only if last_dag_run is finished and was an automated run + """Check if the dag's next_dagruns_create_after should be updated.""" + # If last_dag_run is defined, the update was triggered by a scheduling decision in this DAG run. + # In such case, schedule next only if last_dag_run is finished and was an automated run. if last_dag_run and not ( last_dag_run.state in State.finished_dr_states and last_dag_run.run_type in [DagRunType.SCHEDULED, DagRunType.BACKFILL_JOB] From 6fa84fdd8043e18ce738f042ff44e33e4455af12 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Fri, 1 Sep 2023 22:45:31 +0200 Subject: [PATCH 4/8] add unit test --- tests/jobs/test_scheduler_job.py | 36 ++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 106746cf4df2e..9402e7ea4dae2 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3458,6 +3458,42 @@ def test_should_update_dag_next_dagruns(self, schedule, number_running, excepted dag, dag_model, None, number_running, session=session ) + @pytest.mark.parametrize( + "run_type, should_update", + [ + (DagRunType.MANUAL, False), + (DagRunType.SCHEDULED, True), + (DagRunType.BACKFILL_JOB, True), + (DagRunType.DATASET_TRIGGERED, False), + ], + ids=[DagRunType.MANUAL.name, DagRunType.SCHEDULED.name, DagRunType.BACKFILL_JOB.name, DagRunType.DATASET_TRIGGERED.name], + ) + def test_should_update_dag_next_dagruns_after_run_type(self, run_type, should_update, session, dag_maker): + """Test that whether next dagrun is updated depends on run type""" + with dag_maker( + dag_id="test_should_update_dag_next_dagruns_after_run_type", schedule="*/1 * * * *", max_active_runs=10 + ) as dag: + EmptyOperator(task_id="dummy") + + dag_model = dag_maker.dag_model + + run = dag_maker.create_dagrun( + run_id=f"run", + run_type=run_type, + execution_date=DEFAULT_DATE, + start_date=timezone.utcnow(), + state=State.SUCCESS, + session=session, + ) + + session.flush() + scheduler_job = Job(executor=self.null_exec) + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + assert should_update is self.job_runner._should_update_dag_next_dagruns( + dag, dag_model, run, 0, session=session + ) + def test_create_dag_runs(self, dag_maker): """ Test various invariants of _create_dag_runs. From ee7effec4f556ff196ee88ae8834a1294097bdcd Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sat, 2 Sep 2023 07:15:23 +0200 Subject: [PATCH 5/8] static check --- tests/jobs/test_scheduler_job.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 9402e7ea4dae2..262751f7fc8ed 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3466,19 +3466,26 @@ def test_should_update_dag_next_dagruns(self, schedule, number_running, excepted (DagRunType.BACKFILL_JOB, True), (DagRunType.DATASET_TRIGGERED, False), ], - ids=[DagRunType.MANUAL.name, DagRunType.SCHEDULED.name, DagRunType.BACKFILL_JOB.name, DagRunType.DATASET_TRIGGERED.name], + ids=[ + DagRunType.MANUAL.name, + DagRunType.SCHEDULED.name, + DagRunType.BACKFILL_JOB.name, + DagRunType.DATASET_TRIGGERED.name, + ], ) def test_should_update_dag_next_dagruns_after_run_type(self, run_type, should_update, session, dag_maker): """Test that whether next dagrun is updated depends on run type""" with dag_maker( - dag_id="test_should_update_dag_next_dagruns_after_run_type", schedule="*/1 * * * *", max_active_runs=10 + dag_id="test_should_update_dag_next_dagruns_after_run_type", + schedule="*/1 * * * *", + max_active_runs=10, ) as dag: EmptyOperator(task_id="dummy") dag_model = dag_maker.dag_model run = dag_maker.create_dagrun( - run_id=f"run", + run_id="run", run_type=run_type, execution_date=DEFAULT_DATE, start_date=timezone.utcnow(), From df068d52869646ea6a3f677044610b13ec53498e Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sun, 3 Sep 2023 13:42:16 +0200 Subject: [PATCH 6/8] Undo renaming --- airflow/models/dag.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 967ff24966395..4113ca6b24c64 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2931,12 +2931,12 @@ def bulk_write_to_db( session.add(orm_dag) orm_dags.append(orm_dag) - dag_id_to_last_automated_run: dict[str, DagRun] = {} + most_recent_runs: dict[str, DagRun] = {} num_active_runs: dict[str, int] = {} # Skip these queries entirely if no DAGs can be scheduled to save time. if any(dag.timetable.can_be_scheduled for dag in dags): - # Get the latest automated dag run for each existing dag as a single query (avoid n+1 query) - last_automated_runs_subq = ( + # Get the latest dag run for each existing dag as a single query (avoid n+1 query) + most_recent_subq = ( select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) .where( DagRun.dag_id.in_(existing_dags), @@ -2945,13 +2945,13 @@ def bulk_write_to_db( .group_by(DagRun.dag_id) .subquery() ) - last_automated_runs = session.scalars( + most_recent_runs_iter = session.scalars( select(DagRun).where( - DagRun.dag_id == last_automated_runs_subq.c.dag_id, - DagRun.execution_date == last_automated_runs_subq.c.max_execution_date, + DagRun.dag_id == most_recent_subq.c.dag_id, + DagRun.execution_date == most_recent_subq.c.max_execution_date, ) ) - dag_id_to_last_automated_run = {run.dag_id: run for run in last_automated_runs} + most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter} # Get number of active dagruns for all dags we are processing as a single query. num_active_runs = DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session) @@ -2985,15 +2985,15 @@ def bulk_write_to_db( orm_dag.timetable_description = dag.timetable.description orm_dag.processor_subdir = processor_subdir - last_automated_run: DagRun | None = dag_id_to_last_automated_run.get(dag.dag_id) - if last_automated_run is None: - last_automated_data_interval = None + run: DagRun | None = most_recent_runs.get(dag.dag_id) + if run is None: + data_interval = None else: - last_automated_data_interval = dag.get_run_data_interval(last_automated_run) + data_interval = dag.get_run_data_interval(run) if num_active_runs.get(dag.dag_id, 0) >= orm_dag.max_active_runs: orm_dag.next_dagrun_create_after = None else: - orm_dag.calculate_dagrun_date_fields(dag, last_automated_data_interval) + orm_dag.calculate_dagrun_date_fields(dag, data_interval) dag_tags = set(dag.tags or {}) orm_dag_tags = list(orm_dag.tags or []) @@ -3659,27 +3659,27 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[ def calculate_dagrun_date_fields( self, dag: DAG, - last_automated_dag_run: None | datetime | DataInterval, + most_recent_dag_run: None | datetime | DataInterval, ) -> None: """ Calculate ``next_dagrun`` and `next_dagrun_create_after``. :param dag: The DAG object - :param last_automated_dag_run: DataInterval (or datetime) of most recent run of this dag, or none + :param most_recent_dag_run: DataInterval (or datetime) of most recent run of this dag, or none if not yet scheduled. """ - last_automated_data_interval: DataInterval | None - if isinstance(last_automated_dag_run, datetime): + most_recent_data_interval: DataInterval | None + if isinstance(most_recent_dag_run, datetime): warnings.warn( "Passing a datetime to `DagModel.calculate_dagrun_date_fields` is deprecated. " "Provide a data interval instead.", RemovedInAirflow3Warning, stacklevel=2, ) - last_automated_data_interval = dag.infer_automated_data_interval(last_automated_dag_run) + most_recent_data_interval = dag.infer_automated_data_interval(most_recent_dag_run) else: - last_automated_data_interval = last_automated_dag_run - next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval) + most_recent_data_interval = most_recent_dag_run + next_dagrun_info = dag.next_dagrun_info(most_recent_data_interval) if next_dagrun_info is None: self.next_dagrun_data_interval = self.next_dagrun = self.next_dagrun_create_after = None else: From 5b82833a1f62f930a02027e46557037be667acc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Dyl=C4=85g?= Date: Tue, 5 Sep 2023 11:20:54 +0200 Subject: [PATCH 7/8] Update airflow/jobs/scheduler_job_runner.py Co-authored-by: Tzu-ping Chung --- airflow/jobs/scheduler_job_runner.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 27086d2795283..80a9899e5a65f 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1192,7 +1192,11 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - ) active_runs_of_dags[dag.dag_id] += 1 if self._should_update_dag_next_dagruns( - dag, dag_model, None, active_runs_of_dags[dag.dag_id], session=session + dag, + dag_model, + last_dag_run=None, + total_active_runs=active_runs_of_dags[dag.dag_id], + session=session, ): dag_model.calculate_dagrun_date_fields(dag, data_interval) # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in From 14f8b60f5935d884223f62fbb5dba74c188a77b9 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Tue, 5 Sep 2023 11:32:14 +0200 Subject: [PATCH 8/8] use keyword-only arguments for last_dag_run and total_active_runs --- airflow/jobs/scheduler_job_runner.py | 6 +++--- tests/jobs/test_scheduler_job.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 80a9899e5a65f..c00886a40f73b 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1307,9 +1307,9 @@ def _should_update_dag_next_dagruns( self, dag: DAG, dag_model: DagModel, + *, last_dag_run: DagRun | None = None, total_active_runs: int | None = None, - *, session: Session, ) -> bool: """Check if the dag's next_dagruns_create_after should be updated.""" @@ -1455,7 +1455,7 @@ def _schedule_dag_run( session.flush() self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) - if self._should_update_dag_next_dagruns(dag, dag_model, dag_run, session=session): + if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session): dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) callback_to_execute = DagCallbackRequest( @@ -1483,7 +1483,7 @@ def _schedule_dag_run( # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) - if self._should_update_dag_next_dagruns(dag, dag_model, dag_run, session=session): + if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session): dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) # This will do one query per dag run. We "could" build up a complex # query to update all the TIs across all the execution dates and dag diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 262751f7fc8ed..4fdb006a38a66 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3455,7 +3455,7 @@ def test_should_update_dag_next_dagruns(self, schedule, number_running, excepted self.job_runner = SchedulerJobRunner(job=scheduler_job) assert excepted is self.job_runner._should_update_dag_next_dagruns( - dag, dag_model, None, number_running, session=session + dag, dag_model, total_active_runs=number_running, session=session ) @pytest.mark.parametrize( @@ -3498,7 +3498,7 @@ def test_should_update_dag_next_dagruns_after_run_type(self, run_type, should_up self.job_runner = SchedulerJobRunner(job=scheduler_job) assert should_update is self.job_runner._should_update_dag_next_dagruns( - dag, dag_model, run, 0, session=session + dag, dag_model, last_dag_run=run, total_active_runs=0, session=session ) def test_create_dag_runs(self, dag_maker):