diff --git a/.gitignore b/.gitignore index 48af479ee14a9..ca6de2b0644fe 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ .DS_Store .ipynb* .coverage +.python-version airflow/git_version airflow/www/static/coverage/ airflow.db diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py new file mode 100644 index 0000000000000..9ce03b9aa6a7f --- /dev/null +++ b/airflow/example_dags/example_latest_only.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Example of the LatestOnlyOperator +""" +import datetime as dt + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.latest_only_operator import LatestOnlyOperator +from airflow.utils.trigger_rule import TriggerRule + + +dag = DAG( + dag_id='latest_only', + schedule_interval=dt.timedelta(hours=4), + start_date=dt.datetime(2016, 9, 20), +) + +latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) + +task1 = DummyOperator(task_id='task1', dag=dag) +task1.set_upstream(latest_only) diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py new file mode 100644 index 0000000000000..e3a88b7b0b85b --- /dev/null +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Example LatestOnlyOperator and TriggerRule interactions +""" +import datetime as dt + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.latest_only_operator import LatestOnlyOperator +from airflow.utils.trigger_rule import TriggerRule + + +dag = DAG( + dag_id='latest_only_with_trigger', + schedule_interval=dt.timedelta(hours=4), + start_date=dt.datetime(2016, 9, 20), +) + +latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) + +task1 = DummyOperator(task_id='task1', dag=dag) +task1.set_upstream(latest_only) + +task2 = DummyOperator(task_id='task2', dag=dag) + +task3 = DummyOperator(task_id='task3', dag=dag) +task3.set_upstream([task1, task2]) + +task4 = DummyOperator(task_id='task4', dag=dag, + trigger_rule=TriggerRule.ALL_DONE) +task4.set_upstream([task1, task2]) diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py index f39ad01671cb5..4cfac7b8cb451 100644 --- a/airflow/operators/__init__.py +++ b/airflow/operators/__init__.py @@ -57,6 +57,7 @@ 'dummy_operator': ['DummyOperator'], 'email_operator': ['EmailOperator'], 'hive_to_samba_operator': ['Hive2SambaOperator'], + 'latest_only_operator': ['LatestOnlyOperator'], 'mysql_operator': ['MySqlOperator'], 'sqlite_operator': ['SqliteOperator'], 'mysql_to_hive': ['MySqlToHiveTransfer'], diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py new file mode 100644 index 0000000000000..49ba2a339533c --- /dev/null +++ b/airflow/operators/latest_only_operator.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import logging + +from airflow.models import BaseOperator, TaskInstance +from airflow.utils.state import State +from airflow import settings + + +class LatestOnlyOperator(BaseOperator): + """ + Allows a workflow to skip tasks that are not running during the most + recent schedule interval. + + If the task is run outside of the latest schedule interval, all + directly downstream tasks will be skipped. + """ + + ui_color = '#e9ffdb' # nyanza + + def execute(self, context): + now = datetime.datetime.now() + left_window = context['dag'].following_schedule( + context['execution_date']) + right_window = context['dag'].following_schedule(left_window) + logging.info( + 'Checking latest only with left_window: %s right_window: %s ' + 'now: %s', left_window, right_window, now) + if not left_window < now <= right_window: + logging.info('Not latest execution, skipping downstream.') + session = settings.Session() + for task in context['task'].downstream_list: + ti = TaskInstance( + task, execution_date=context['ti'].execution_date) + logging.info('Skipping task: %s', ti.task_id) + ti.state = State.SKIPPED + ti.start_date = now + ti.end_date = now + session.merge(ti) + session.commit() + session.close() + logging.info('Done.') + else: + logging.info('Latest, allowing execution to proceed.') diff --git a/docs/concepts.rst b/docs/concepts.rst index 8cfc8aba74b61..82d52488ccc98 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -594,6 +594,80 @@ that, when set to ``True``, keeps a task from getting triggered if the previous schedule for the task hasn't succeeded. +Latest Run Only +=============== + +Standard workflow behavior involves running a series of tasks for a +particular date/time range. Some workflows, however, perform tasks that +are independent of run time but need to be run on a schedule, much like a +standard cron job. In these cases, backfills or running jobs missed during +a pause just wastes CPU cycles. + +For situations like this, you can use the ``LatestOnlyOperator`` to skip +tasks that are not being run during the most recent scheduled run for a +DAG. The ``LatestOnlyOperator`` skips all immediate downstream tasks, and +itself, if the time right now is not between its ``execution_time`` and the +next scheduled ``execution_time``. + +One must be aware of the interaction between skipped tasks and trigger +rules. Skipped tasks will cascade through trigger rules ``all_success`` +and ``all_failed`` but not ``all_done``, ``one_failed``, ``one_success``, +and ``dummy``. If you would like to use the ``LatestOnlyOperator`` with +trigger rules that do not cascade skips, you will need to ensure that the +``LatestOnlyOperator`` is **directly** upstream of the task you would like +to skip. + +It is possible, through use of trigger rules to mix tasks that should run +in the typical date/time dependent mode and those using the +``LatestOnlyOperator``. + +For example, consider the following dag: + +.. code:: python + + #dags/latest_only_with_trigger.py + import datetime as dt + + from airflow.models import DAG + from airflow.operators.dummy_operator import DummyOperator + from airflow.operators.latest_only_operator import LatestOnlyOperator + from airflow.utils.trigger_rule import TriggerRule + + + dag = DAG( + dag_id='latest_only_with_trigger', + schedule_interval=dt.timedelta(hours=4), + start_date=dt.datetime(2016, 9, 20), + ) + + latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) + + task1 = DummyOperator(task_id='task1', dag=dag) + task1.set_upstream(latest_only) + + task2 = DummyOperator(task_id='task2', dag=dag) + + task3 = DummyOperator(task_id='task3', dag=dag) + task3.set_upstream([task1, task2]) + + task4 = DummyOperator(task_id='task4', dag=dag, + trigger_rule=TriggerRule.ALL_DONE) + task4.set_upstream([task1, task2]) + +In the case of this dag, the ``latest_only`` task will show up as skipped +for all runs except the latest run. ``task1`` is directly downstream of +``latest_only`` and will also skip for all runs except the latest. +``task2`` is entirely independent of ``latest_only`` and will run in all +scheduled periods. ``task3`` is downstream of ``task1`` and ``task2`` and +because of the default ``trigger_rule`` being ``all_success`` will receive +a cascaded skip from ``task1``. ``task4`` is downstream of ``task1`` and +``task2`` but since its ``trigger_rule`` is set to ``all_done`` it will +trigger as soon as ``task1`` has been skipped (a valid completion state) +and ``task2`` has succeeded. + +.. image:: img/latest_only_with_trigger.png + + Zombies & Undeads ================= diff --git a/docs/img/latest_only_with_trigger.png b/docs/img/latest_only_with_trigger.png new file mode 100644 index 0000000000000..629adfa907964 Binary files /dev/null and b/docs/img/latest_only_with_trigger.png differ diff --git a/setup.py b/setup.py index 225573465846c..a63c1beb37d9e 100644 --- a/setup.py +++ b/setup.py @@ -159,7 +159,7 @@ def write_version(filename=os.path.join(*['airflow', cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant -devel = ['lxml>=3.3.4', 'nose', 'nose-parameterized', 'mock', 'click', 'jira', 'moto'] +devel = ['lxml>=3.3.4', 'nose', 'nose-parameterized', 'mock', 'click', 'jira', 'moto', 'freezegun'] devel_minreq = devel + mysql + doc + password + s3 devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker diff --git a/tests/core.py b/tests/core.py index e443a03b1f585..cffdc1f284c7d 100644 --- a/tests/core.py +++ b/tests/core.py @@ -60,7 +60,7 @@ import six -NUM_EXAMPLE_DAGS = 16 +NUM_EXAMPLE_DAGS = 18 DEV_NULL = '/dev/null' TEST_DAG_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'dags') diff --git a/tests/operators/latest_only_operator.py b/tests/operators/latest_only_operator.py new file mode 100644 index 0000000000000..37aec38d4cac5 --- /dev/null +++ b/tests/operators/latest_only_operator.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function, unicode_literals + +import datetime +import logging +import unittest + +from airflow import configuration, DAG, settings +from airflow.jobs import BackfillJob +from airflow.models import TaskInstance +from airflow.operators.latest_only_operator import LatestOnlyOperator +from airflow.operators.dummy_operator import DummyOperator +from freezegun import freeze_time + +DEFAULT_DATE = datetime.datetime(2016, 1, 1) +END_DATE = datetime.datetime(2016, 1, 2) +INTERVAL = datetime.timedelta(hours=12) +FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1) + + +def get_task_instances(task_id): + session = settings.Session() + return session \ + .query(TaskInstance) \ + .filter(TaskInstance.task_id == task_id) \ + .order_by(TaskInstance.execution_date) \ + .all() + + +class LatestOnlyOperatorTest(unittest.TestCase): + + def setUp(self): + super(LatestOnlyOperatorTest, self).setUp() + configuration.load_test_config() + self.dag = DAG( + 'test_dag', + default_args={ + 'owner': 'airflow', + 'start_date': DEFAULT_DATE}, + schedule_interval=INTERVAL) + self.addCleanup(self.dag.clear) + freezer = freeze_time(FROZEN_NOW) + freezer.start() + self.addCleanup(freezer.stop) + + def test_run(self): + task = LatestOnlyOperator( + task_id='latest', + dag=self.dag) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_skipping(self): + latest_task = LatestOnlyOperator( + task_id='latest', + dag=self.dag) + downstream_task = DummyOperator( + task_id='downstream', + dag=self.dag) + downstream_task.set_upstream(latest_task) + + latest_task.run(start_date=DEFAULT_DATE, end_date=END_DATE) + downstream_task.run(start_date=DEFAULT_DATE, end_date=END_DATE) + + latest_instances = get_task_instances('latest') + exec_date_to_latest_state = { + ti.execution_date: ti.state for ti in latest_instances} + assert exec_date_to_latest_state == { + datetime.datetime(2016, 1, 1): 'success', + datetime.datetime(2016, 1, 1, 12): 'success', + datetime.datetime(2016, 1, 2): 'success', + } + + downstream_instances = get_task_instances('downstream') + exec_date_to_downstream_state = { + ti.execution_date: ti.state for ti in downstream_instances} + assert exec_date_to_downstream_state == { + datetime.datetime(2016, 1, 1): 'skipped', + datetime.datetime(2016, 1, 1, 12): 'skipped', + datetime.datetime(2016, 1, 2): 'success', + }