diff --git a/airflow/jobs.py b/airflow/jobs.py index 944d847a00924..f08efbc49473d 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -677,103 +677,105 @@ def clear_import_errors(session): def create_dag_run(self, dag, session=None): """ This method checks whether a new DagRun needs to be created - for a DAG based on scheduling interval + for dag based on scheduling interval Returns DagRun if one is scheduled. Otherwise returns None. """ - if dag.schedule_interval: - active_runs = DagRun.find( - dag_id=dag.dag_id, - state=State.RUNNING, - external_trigger=False, - session=session - ) - # return if already reached maximum active runs and no timeout setting - if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout: - return - timedout_runs = 0 - for dr in active_runs: - if ( - dr.start_date and dag.dagrun_timeout and - dr.start_date < datetime.now() - dag.dagrun_timeout): - dr.state = State.FAILED - dr.end_date = datetime.now() - timedout_runs += 1 - session.commit() - if len(active_runs) - timedout_runs >= dag.max_active_runs: - return + if not dag.schedule_interval: + return - # this query should be replaced by find dagrun - qry = ( - session.query(func.max(DagRun.execution_date)) - .filter_by(dag_id=dag.dag_id) - .filter(or_( - DagRun.external_trigger == False, - # add % as a wildcard for the like query - DagRun.run_id.like(DagRun.ID_PREFIX+'%') - )) - ) - last_scheduled_run = qry.scalar() - - # don't schedule @once again - if dag.schedule_interval == '@once' and last_scheduled_run: - return None - - next_run_date = None - if not last_scheduled_run: - # First run - task_start_dates = [t.start_date for t in dag.tasks] - if task_start_dates: - next_run_date = dag.normalize_schedule(min(task_start_dates)) - self.logger.debug("Next run date based on tasks {}" - .format(next_run_date)) - else: - next_run_date = dag.following_schedule(last_scheduled_run) - - # make sure backfills are also considered - last_run = dag.get_last_dagrun(session=session) - if last_run and next_run_date: - while next_run_date <= last_run.execution_date: - next_run_date = dag.following_schedule(next_run_date) - - # don't ever schedule prior to the dag's start_date - if dag.start_date: - next_run_date = (dag.start_date if not next_run_date - else max(next_run_date, dag.start_date)) - if next_run_date == dag.start_date: - next_run_date = dag.normalize_schedule(dag.start_date) - - self.logger.debug("Dag start date: {}. Next run date: {}" - .format(dag.start_date, next_run_date)) - - # this structure is necessary to avoid a TypeError from concatenating - # NoneType - if dag.schedule_interval == '@once': - period_end = next_run_date - elif next_run_date: - period_end = dag.following_schedule(next_run_date) - - # Don't schedule a dag beyond its end_date (as specified by the dag param) - if next_run_date and dag.end_date and next_run_date > dag.end_date: - return + active_runs = DagRun.find( + dag_id=dag.dag_id, + state=State.RUNNING, + external_trigger=False, + session=session + ) + # return if already reached maximum active runs and no timeout setting + if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout: + return + timedout_runs = 0 + for dr in active_runs: + if ( + dr.start_date and dag.dagrun_timeout and + dr.start_date < datetime.now() - dag.dagrun_timeout): + dr.state = State.FAILED + dr.end_date = datetime.now() + timedout_runs += 1 + session.commit() + if len(active_runs) - timedout_runs >= dag.max_active_runs: + return - # Don't schedule a dag beyond its end_date (as specified by the task params) - # Get the min task end date, which may come from the dag.default_args - min_task_end_date = [] - task_end_dates = [t.end_date for t in dag.tasks if t.end_date] - if task_end_dates: - min_task_end_date = min(task_end_dates) - if next_run_date and min_task_end_date and next_run_date > min_task_end_date: - return + # this query should be replaced by find dagrun + qry = ( + session.query(func.max(DagRun.execution_date)) + .filter_by(dag_id=dag.dag_id) + .filter(or_( + DagRun.external_trigger == False, + # add % as a wildcard for the like query + DagRun.run_id.like(DagRun.ID_PREFIX+'%') + )) + ) + last_scheduled_run = qry.scalar() + + # don't schedule @once again + if dag.schedule_interval == '@once' and last_scheduled_run: + return None + + next_run_date = None + if not last_scheduled_run: + # First run + task_start_dates = [t.start_date for t in dag.tasks] + if task_start_dates: + next_run_date = dag.normalize_schedule(min(task_start_dates)) + self.logger.debug("Next run date based on tasks {}" + .format(next_run_date)) + else: + next_run_date = dag.following_schedule(last_scheduled_run) + + # make sure backfills are also considered + last_run = dag.get_last_dagrun(session=session) + if last_run and next_run_date: + while next_run_date <= last_run.execution_date: + next_run_date = dag.following_schedule(next_run_date) + + # don't ever schedule prior to the dag's start_date + if dag.start_date: + next_run_date = (dag.start_date if not next_run_date + else max(next_run_date, dag.start_date)) + if next_run_date == dag.start_date: + next_run_date = dag.normalize_schedule(dag.start_date) + + self.logger.debug("Dag start date: {}. Next run date: {}" + .format(dag.start_date, next_run_date)) + + # this structure is necessary to avoid a TypeError from concatenating + # NoneType + if dag.schedule_interval == '@once': + period_end = next_run_date + elif next_run_date: + period_end = dag.following_schedule(next_run_date) + + # Don't schedule a dag beyond its end_date (as specified by the dag param) + if next_run_date and dag.end_date and next_run_date > dag.end_date: + return - if next_run_date and period_end and period_end <= datetime.now(): - next_run = dag.create_dagrun( - run_id='scheduled__' + next_run_date.isoformat(), - execution_date=next_run_date, - start_date=datetime.now(), - state=State.RUNNING, - external_trigger=False - ) - return next_run + # Don't schedule a dag beyond its end_date (as specified by the task params) + # Get the min task end date, which may come from the dag.default_args + min_task_end_date = [] + task_end_dates = [t.end_date for t in dag.tasks if t.end_date] + if task_end_dates: + min_task_end_date = min(task_end_dates) + if next_run_date and min_task_end_date and next_run_date > min_task_end_date: + return + + if next_run_date and period_end and period_end <= datetime.now(): + next_run = dag.create_dagrun( + run_id='scheduled__' + next_run_date.isoformat(), + execution_date=next_run_date, + start_date=datetime.now(), + state=State.RUNNING, + external_trigger=False + ) + return next_run def _process_task_instances(self, dag, queue): """ @@ -1058,9 +1060,10 @@ def _process_dags(self, dagbag, dags, tis_out): """ Iterates over the dags and processes them. Processing includes: - 1. Create appropriate DagRun(s) in the DB. - 2. Create appropriate TaskInstance(s) in the DB. - 3. Send emails for tasks that have missed SLAs. + 1. Create DagRun in the DB if needed. + 2. Create appropriate TaskInstances in the DB for existing DagRuns. + 3. Change the state of the DagRun if needed. + 4. Send emails for tasks that have missed SLAs. :param dagbag: a collection of DAGs to process :type dagbag: models.DagBag @@ -1072,28 +1075,32 @@ def _process_dags(self, dagbag, dags, tis_out): """ for dag in dags: dag = dagbag.get_dag(dag.dag_id) - if dag.reached_max_runs: - self.logger.info("Not processing DAG {} since its max runs has been reached" - .format(dag.dag_id)) - continue if dag.is_paused: - self.logger.info("Not processing DAG {} since it's paused" - .format(dag.dag_id)) + self.logger.info( + "Not processing DAG {} since it's paused", dag.dag_id) continue if not dag: - self.logger.error("DAG ID {} was not found in the DagBag" - .format(dag.dag_id)) + self.logger.error( + "DAG ID {} was not found in the DagBag", dag.dag_id) continue - self.logger.info("Processing {}".format(dag.dag_id)) + self.logger.info("Processing {}", dag.dag_id) - dag_run = self.create_dag_run(dag) - if dag_run: - self.logger.info("Created {}".format(dag_run)) self._process_task_instances(dag, tis_out) self.manage_slas(dag) + if dag.reached_max_runs: + self.logger.info( + "Not attempting to create new DagRun for {} since its max runs has been reached" + dag.dag_id + ) + else: + # Try to create a new dag run + dag_run = self.create_dag_run(dag) + if dag_run: + self.logger.info("Created {}".format(dag_run)) + def _process_executor_events(self): """ Respond to executor events.