From be28e93053627711cb561d2816840468ec5e2229 Mon Sep 17 00:00:00 2001 From: Norman Mu Date: Mon, 8 Aug 2016 10:44:10 -0700 Subject: [PATCH] [AIRFLOW-78] airflow clear leaves dag_runs Fix a bug in the scheduler where dag runs cleared via CLI would be picked up without checking max_active_dag_runs first, resulting in too many simultaneous dag runs. --- airflow/jobs.py | 4 ++++ airflow/models.py | 9 +++++++++ tests/jobs.py | 18 ++++++++++++++---- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index c07c4115629d3..95804350cd315 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1057,6 +1057,10 @@ def _process_dags(self, dagbag, dags, tis_out): """ for dag in dags: dag = dagbag.get_dag(dag.dag_id) + if dag.reached_max_runs: + self.logger.info("Not processing DAG {} since its max runs has been reached" + .format(dag.dag_id)) + continue if dag.is_paused: self.logger.info("Not processing DAG {} since it's paused" .format(dag.dag_id)) diff --git a/airflow/models.py b/airflow/models.py index 182f7cc364018..4ccf2a1baa3eb 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -2879,6 +2879,15 @@ def subdags(self): l += task.subdag.subdags return l + @property + def reached_max_runs(self): + active_runs = DagRun.find( + dag_id=self.dag_id, + state=State.RUNNING, + external_trigger=False + ) + return len(active_runs) >= self.max_active_runs + def resolve_template_files(self): for t in self.tasks: t.resolve_template_files() diff --git a/tests/jobs.py b/tests/jobs.py index e86b9dadd18af..351268a59293a 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -20,6 +20,7 @@ import datetime import logging import os +import time import unittest from airflow import AirflowException, settings @@ -609,14 +610,23 @@ def test_scheduler_verify_max_active_runs(self): session.commit() session.close() - scheduler = SchedulerJob() - dag.clear() + scheduler = SchedulerJob(dag.dag_id, + run_duration=1) dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) - dr = scheduler.create_dag_run(dag) - self.assertIsNone(dr) + dr2 = scheduler.create_dag_run(dag) + self.assertIsNone(dr2) + + dag.clear() + + dag.max_active_runs = 0 + scheduler.run() + + session = settings.Session() + self.assertEqual( + len(session.query(TI).filter(TI.dag_id == dag.dag_id).all()), 0) def test_scheduler_fail_dagrun_timeout(self): """