Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 114 additions & 107 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,103 +677,105 @@ def clear_import_errors(session):
def create_dag_run(self, dag, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
for dag based on scheduling interval
Returns DagRun if one is scheduled. Otherwise returns None.
"""
if dag.schedule_interval:
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
external_trigger=False,
session=session
)
# return if already reached maximum active runs and no timeout setting
if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
return
timedout_runs = 0
for dr in active_runs:
if (
dr.start_date and dag.dagrun_timeout and
dr.start_date < datetime.now() - dag.dagrun_timeout):
dr.state = State.FAILED
dr.end_date = datetime.now()
timedout_runs += 1
session.commit()
if len(active_runs) - timedout_runs >= dag.max_active_runs:
return
if not dag.schedule_interval:
return

# this query should be replaced by find dagrun
qry = (
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
.filter(or_(
DagRun.external_trigger == False,
# add % as a wildcard for the like query
DagRun.run_id.like(DagRun.ID_PREFIX+'%')
))
)
last_scheduled_run = qry.scalar()

# don't schedule @once again
if dag.schedule_interval == '@once' and last_scheduled_run:
return None

next_run_date = None
if not last_scheduled_run:
# First run
task_start_dates = [t.start_date for t in dag.tasks]
if task_start_dates:
next_run_date = dag.normalize_schedule(min(task_start_dates))
self.logger.debug("Next run date based on tasks {}"
.format(next_run_date))
else:
next_run_date = dag.following_schedule(last_scheduled_run)

# make sure backfills are also considered
last_run = dag.get_last_dagrun(session=session)
if last_run and next_run_date:
while next_run_date <= last_run.execution_date:
next_run_date = dag.following_schedule(next_run_date)

# don't ever schedule prior to the dag's start_date
if dag.start_date:
next_run_date = (dag.start_date if not next_run_date
else max(next_run_date, dag.start_date))
if next_run_date == dag.start_date:
next_run_date = dag.normalize_schedule(dag.start_date)

self.logger.debug("Dag start date: {}. Next run date: {}"
.format(dag.start_date, next_run_date))

# this structure is necessary to avoid a TypeError from concatenating
# NoneType
if dag.schedule_interval == '@once':
period_end = next_run_date
elif next_run_date:
period_end = dag.following_schedule(next_run_date)

# Don't schedule a dag beyond its end_date (as specified by the dag param)
if next_run_date and dag.end_date and next_run_date > dag.end_date:
return
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
external_trigger=False,
session=session
)
# return if already reached maximum active runs and no timeout setting
if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
return
timedout_runs = 0
for dr in active_runs:
if (
dr.start_date and dag.dagrun_timeout and
dr.start_date < datetime.now() - dag.dagrun_timeout):
dr.state = State.FAILED
dr.end_date = datetime.now()
timedout_runs += 1
session.commit()
if len(active_runs) - timedout_runs >= dag.max_active_runs:
return

# Don't schedule a dag beyond its end_date (as specified by the task params)
# Get the min task end date, which may come from the dag.default_args
min_task_end_date = []
task_end_dates = [t.end_date for t in dag.tasks if t.end_date]
if task_end_dates:
min_task_end_date = min(task_end_dates)
if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
return
# this query should be replaced by find dagrun
qry = (
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
.filter(or_(
DagRun.external_trigger == False,
# add % as a wildcard for the like query
DagRun.run_id.like(DagRun.ID_PREFIX+'%')
))
)
last_scheduled_run = qry.scalar()

# don't schedule @once again
if dag.schedule_interval == '@once' and last_scheduled_run:
return None

next_run_date = None
if not last_scheduled_run:
# First run
task_start_dates = [t.start_date for t in dag.tasks]
if task_start_dates:
next_run_date = dag.normalize_schedule(min(task_start_dates))
self.logger.debug("Next run date based on tasks {}"
.format(next_run_date))
else:
next_run_date = dag.following_schedule(last_scheduled_run)

# make sure backfills are also considered
last_run = dag.get_last_dagrun(session=session)
if last_run and next_run_date:
while next_run_date <= last_run.execution_date:
next_run_date = dag.following_schedule(next_run_date)

# don't ever schedule prior to the dag's start_date
if dag.start_date:
next_run_date = (dag.start_date if not next_run_date
else max(next_run_date, dag.start_date))
if next_run_date == dag.start_date:
next_run_date = dag.normalize_schedule(dag.start_date)

self.logger.debug("Dag start date: {}. Next run date: {}"
.format(dag.start_date, next_run_date))

# this structure is necessary to avoid a TypeError from concatenating
# NoneType
if dag.schedule_interval == '@once':
period_end = next_run_date
elif next_run_date:
period_end = dag.following_schedule(next_run_date)

# Don't schedule a dag beyond its end_date (as specified by the dag param)
if next_run_date and dag.end_date and next_run_date > dag.end_date:
return

if next_run_date and period_end and period_end <= datetime.now():
next_run = dag.create_dagrun(
run_id='scheduled__' + next_run_date.isoformat(),
execution_date=next_run_date,
start_date=datetime.now(),
state=State.RUNNING,
external_trigger=False
)
return next_run
# Don't schedule a dag beyond its end_date (as specified by the task params)
# Get the min task end date, which may come from the dag.default_args
min_task_end_date = []
task_end_dates = [t.end_date for t in dag.tasks if t.end_date]
if task_end_dates:
min_task_end_date = min(task_end_dates)
if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
return

if next_run_date and period_end and period_end <= datetime.now():
next_run = dag.create_dagrun(
run_id='scheduled__' + next_run_date.isoformat(),
execution_date=next_run_date,
start_date=datetime.now(),
state=State.RUNNING,
external_trigger=False
)
return next_run

def _process_task_instances(self, dag, queue):
"""
Expand Down Expand Up @@ -1058,9 +1060,10 @@ def _process_dags(self, dagbag, dags, tis_out):
"""
Iterates over the dags and processes them. Processing includes:

1. Create appropriate DagRun(s) in the DB.
2. Create appropriate TaskInstance(s) in the DB.
3. Send emails for tasks that have missed SLAs.
1. Create DagRun in the DB if needed.
2. Create appropriate TaskInstances in the DB for existing DagRuns.
3. Change the state of the DagRun if needed.
4. Send emails for tasks that have missed SLAs.

:param dagbag: a collection of DAGs to process
:type dagbag: models.DagBag
Expand All @@ -1072,28 +1075,32 @@ def _process_dags(self, dagbag, dags, tis_out):
"""
for dag in dags:
dag = dagbag.get_dag(dag.dag_id)
if dag.reached_max_runs:
self.logger.info("Not processing DAG {} since its max runs has been reached"
.format(dag.dag_id))
continue
if dag.is_paused:
self.logger.info("Not processing DAG {} since it's paused"
.format(dag.dag_id))
self.logger.info(
"Not processing DAG {} since it's paused", dag.dag_id)
continue

if not dag:
self.logger.error("DAG ID {} was not found in the DagBag"
.format(dag.dag_id))
self.logger.error(
"DAG ID {} was not found in the DagBag", dag.dag_id)
continue

self.logger.info("Processing {}".format(dag.dag_id))
self.logger.info("Processing {}", dag.dag_id)

dag_run = self.create_dag_run(dag)
if dag_run:
self.logger.info("Created {}".format(dag_run))
self._process_task_instances(dag, tis_out)
self.manage_slas(dag)

if dag.reached_max_runs:
self.logger.info(
"Not attempting to create new DagRun for {} since its max runs has been reached"
dag.dag_id
)
else:
# Try to create a new dag run
dag_run = self.create_dag_run(dag)
if dag_run:
self.logger.info("Created {}".format(dag_run))

def _process_executor_events(self):
"""
Respond to executor events.
Expand Down