From 5a242a73d502f3bf0a824320b886f35eb11a2bc8 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Mon, 4 Mar 2024 21:50:25 +0100 Subject: [PATCH 1/7] Pass task output as outlet to dataset trigger params --- airflow/example_dags/example_datasets.py | 11 +++++++++-- airflow/jobs/scheduler_job_runner.py | 8 ++++++++ airflow/models/taskinstance.py | 13 +++++++++---- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py index ac7cc2b3c1702..15586063edc4e 100644 --- a/airflow/example_dags/example_datasets.py +++ b/airflow/example_dags/example_datasets.py @@ -51,18 +51,21 @@ """ from __future__ import annotations +import random + import pendulum from airflow.datasets import Dataset from airflow.models.dag import DAG from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator from airflow.timetables.datasets import DatasetOrTimeSchedule from airflow.timetables.trigger import CronTriggerTimetable # [START dataset_def] dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"}) # [END dataset_def] -dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"}) +dag2_dataset = Dataset("s3://dag2/output_1.txt") dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"}) with DAG( @@ -83,7 +86,11 @@ schedule=None, tags=["produces", "dataset-scheduled"], ) as dag2: - BashOperator(outlets=[dag2_dataset], task_id="producing_task_2", bash_command="sleep 5") + + def some_python_callable(): + return {"some_context": "dynamic data 123", "random_number": random.randint(1, 100)} + + PythonOperator(python_callable=some_python_callable, outlets=[dag2_dataset], task_id="producing_task_2") # [START dag_dep] with DAG( diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 32cc9f5a634ab..040d59860c7e4 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1284,9 +1284,17 @@ def _create_dag_runs_dataset_triggered( events=dataset_events, ) + run_conf = {} + for item in dataset_events: + event: DatasetEvent = item + extra: dict | None = event.extra + if extra: + run_conf.update(extra) + dag_run = dag.create_dagrun( run_id=run_id, run_type=DagRunType.DATASET_TRIGGERED, + conf=run_conf, execution_date=exec_date, data_interval=data_interval, state=DagRunState.QUEUED, diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 57c9483cd4ee7..da9772964584b 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2374,7 +2374,7 @@ def _run_raw_task( try: if not mark_success: - self._execute_task_with_callbacks(context, test_mode, session=session) + result = self._execute_task_with_callbacks(context, test_mode, session=session) if not test_mode: self.refresh_from_db(lock_for_update=True, session=session) self.state = TaskInstanceState.SUCCESS @@ -2462,7 +2462,7 @@ def _run_raw_task( session.add(Log(self.state, self)) session.merge(self).task = self.task if self.state == TaskInstanceState.SUCCESS: - self._register_dataset_changes(session=session) + self._register_dataset_changes(result, session=session) session.commit() if self.state == TaskInstanceState.SUCCESS: @@ -2472,7 +2472,7 @@ def _run_raw_task( return None - def _register_dataset_changes(self, *, session: Session) -> None: + def _register_dataset_changes(self, result: Any, *, session: Session) -> None: for obj in self.task.outlets or []: self.log.debug("outlet obj %s", obj) # Lineage can have other types of objects besides datasets @@ -2480,10 +2480,13 @@ def _register_dataset_changes(self, *, session: Session) -> None: dataset_manager.register_dataset_change( task_instance=self, dataset=obj, + extra=obj.extra or result if isinstance(result, dict) else {self.task_id: result}, session=session, ) - def _execute_task_with_callbacks(self, context: Context, test_mode: bool = False, *, session: Session): + def _execute_task_with_callbacks( + self, context: Context, test_mode: bool = False, *, session: Session + ) -> Any: """Prepare Task for Execution.""" from airflow.models.renderedtifields import RenderedTaskInstanceFields @@ -2571,6 +2574,8 @@ def signal_handler(signum, frame): Stats.incr("operator_successes", tags={**self.stats_tags, "task_type": self.task.task_type}) Stats.incr("ti_successes", tags=self.stats_tags) + return result + def _execute_task(self, context, task_orig): """ Execute Task (optionally with a Timeout) and push Xcom results. From fcae73f2e99ecbb96e7ce927e1db517505d58930 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 23 Mar 2024 19:41:59 +0100 Subject: [PATCH 2/7] Rework code proposal --- airflow/datasets/__init__.py | 3 +++ airflow/example_dags/example_datasets.py | 2 +- airflow/jobs/scheduler_job_runner.py | 10 ++++------ airflow/models/taskinstance.py | 8 +++++++- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 2507c69d01b43..e853c0808428a 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -114,6 +114,9 @@ class Dataset(os.PathLike, BaseDatasetEventInput): validator=[attr.validators.min_len(1), attr.validators.max_len(3000)], ) extra: dict[str, Any] | None = None + """Static extra dictionary to be passed with event.""" + extra_from_return: bool = False + """Use task return value as extra value.""" __version__: ClassVar[int] = 1 diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py index 8e7ccb4c279ae..aa085c3e634d1 100644 --- a/airflow/example_dags/example_datasets.py +++ b/airflow/example_dags/example_datasets.py @@ -66,7 +66,7 @@ # [START dataset_def] dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"}) # [END dataset_def] -dag2_dataset = Dataset("s3://dag2/output_1.txt") +dag2_dataset = Dataset("s3://dag2/output_1.txt", extra_from_return=True) dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"}) with DAG( diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 0084aef11107f..d0b8faff30e81 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1264,7 +1264,7 @@ def _create_dag_runs_dataset_triggered( if previous_dag_run: dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date) - dataset_events = session.scalars( + dataset_events: Collection[DatasetEvent] = session.scalars( select(DatasetEvent) .join( DagScheduleDatasetReference, @@ -1283,11 +1283,9 @@ def _create_dag_runs_dataset_triggered( ) run_conf = {} - for item in dataset_events: - event: DatasetEvent = item - extra: dict | None = event.extra - if extra: - run_conf.update(extra) + for event in dataset_events: + if event.extra: + run_conf.update(event.extra) dag_run = dag.create_dagrun( run_id=run_id, diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index b53dc97c3dd89..2e9291194c3ae 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2561,10 +2561,16 @@ def _register_dataset_changes(self, result: Any, *, session: Session) -> None: self.log.debug("outlet obj %s", obj) # Lineage can have other types of objects besides datasets if isinstance(obj, Dataset): + if obj.extra: + extra = obj.extra + elif obj.extra_from_return: + extra = result if isinstance(result, dict) else {str(self.task_id): result} + else: + extra = None dataset_manager.register_dataset_change( task_instance=self, dataset=obj, - extra=obj.extra or result if isinstance(result, dict) else {self.task_id: result}, + extra=extra, session=session, ) From 29b7fde84a135d35277c7e0ae90c34acdb7fc049 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 23 Mar 2024 21:00:52 +0100 Subject: [PATCH 3/7] Add documentation --- airflow/example_dags/example_datasets.py | 2 ++ .../authoring-and-scheduling/datasets.rst | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py index aa085c3e634d1..85417e6c109e7 100644 --- a/airflow/example_dags/example_datasets.py +++ b/airflow/example_dags/example_datasets.py @@ -66,7 +66,9 @@ # [START dataset_def] dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"}) # [END dataset_def] +# [START dataset_def_extra_return] dag2_dataset = Dataset("s3://dag2/output_1.txt", extra_from_return=True) +# [END dataset_def_extra_return] dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"}) with DAG( diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index 354a69ff7edb0..8e1f38c8f6f5e 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -128,6 +128,15 @@ This extra information does not affect a dataset's identity. This means a DAG wi ..., ) +In cases the published extra information should be dynamic, you can also set the ``extra_from_return`` to True. In this case the return value of your executing task will be used as extra information to be published: + +.. exampleinclude:: /../../airflow/example_dags/example_datasets.py + :language: python + :start-after: [START dataset_def_extra_return] + :end-before: [END dataset_def_extra_return] + +The extra information is expected to be a dictionary, if the task is not returning a dictionary, then the value is pushed into a dictionary with the task id as key. + .. note:: **Security Note:** Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in dataset URIs or extra key values! How to use datasets in your DAGs @@ -224,6 +233,17 @@ If one dataset is updated multiple times before all consumed datasets have been } +Dataset Event Extra pushed as DAG Run Config +-------------------------------------------- + +Tasks producing events via ``outlets`` parameter can define extra information in form of a static dictionary. +To make this extra information easy to consume, the information from on or multiple events is merged to a common dictionary +per default and provided as :ref:`DAG run configuration ` to the receiving DAG. This means such data can +directly be used by consuming :ref:`DAGs as parameter `. + +As during the merge of extra information some information might be over-ridden you eithe rneed to use unique keys or use the +``triggering_dataset_events`` as described below. + Fetching information from a Triggering Dataset Event ---------------------------------------------------- From 0d0c5e8027ac5449c899b35cbd7476afe20a749a Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 23 Mar 2024 21:41:36 +0100 Subject: [PATCH 4/7] Add documentation --- docs/apache-airflow/authoring-and-scheduling/datasets.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index 8e1f38c8f6f5e..f7e2ae0921b70 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -241,7 +241,7 @@ To make this extra information easy to consume, the information from on or multi per default and provided as :ref:`DAG run configuration ` to the receiving DAG. This means such data can directly be used by consuming :ref:`DAGs as parameter `. -As during the merge of extra information some information might be over-ridden you eithe rneed to use unique keys or use the +As during the merge of extra information some information might be over-ridden you either need to use unique keys or use the ``triggering_dataset_events`` as described below. Fetching information from a Triggering Dataset Event From 63bedaa6fee5f4c0a1b3d70051cf198461dd5754 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 23 Mar 2024 23:14:46 +0100 Subject: [PATCH 5/7] Fix pytests --- airflow/models/taskinstance.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 2e9291194c3ae..9eb859ceb9fe7 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2425,6 +2425,7 @@ def _run_raw_task( self.test_mode = test_mode self.refresh_from_task(self.task, pool_override=pool) self.refresh_from_db(session=session) + result = None self.job_id = job_id self.hostname = get_hostname() From 847a82418a5218a3faeb56788a3d3d334a356043 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 23 Mar 2024 23:29:38 +0100 Subject: [PATCH 6/7] Extend dataset example --- airflow/example_dags/example_datasets.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py index 85417e6c109e7..999447aea1814 100644 --- a/airflow/example_dags/example_datasets.py +++ b/airflow/example_dags/example_datasets.py @@ -58,6 +58,7 @@ from airflow.datasets import Dataset from airflow.models.dag import DAG +from airflow.models.param import Param from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.timetables.datasets import DatasetOrTimeSchedule @@ -91,7 +92,7 @@ ) as dag2: def some_python_callable(): - return {"some_context": "dynamic data 123", "random_number": random.randint(1, 100)} + return {"sleep_seconds": random.randint(1, 10), "some_context": "dynamic data 123"} PythonOperator(python_callable=some_python_callable, outlets=[dag2_dataset], task_id="producing_task_2") @@ -116,11 +117,12 @@ def some_python_callable(): start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=[dag1_dataset, dag2_dataset], tags=["consumes", "dataset-scheduled"], + params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")}, ) as dag4: BashOperator( outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], task_id="consuming_2", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( @@ -136,7 +138,7 @@ def some_python_callable(): BashOperator( outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], task_id="consuming_3", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( @@ -152,38 +154,41 @@ def some_python_callable(): BashOperator( task_id="unrelated_task", outlets=[Dataset("s3://unrelated_task/dataset_other_unknown.txt")], - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( dag_id="consume_1_and_2_with_dataset_expressions", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=(dag1_dataset & dag2_dataset), + params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")}, ) as dag5: BashOperator( outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], task_id="consume_1_and_2_with_dataset_expressions", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( dag_id="consume_1_or_2_with_dataset_expressions", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=(dag1_dataset | dag2_dataset), + params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")}, ) as dag6: BashOperator( outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], task_id="consume_1_or_2_with_dataset_expressions", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( dag_id="consume_1_or_both_2_and_3_with_dataset_expressions", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)), + params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")}, ) as dag7: BashOperator( outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], task_id="consume_1_or_both_2_and_3_with_dataset_expressions", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( dag_id="conditional_dataset_and_time_based_timetable", @@ -193,9 +198,10 @@ def some_python_callable(): timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset) ), tags=["dataset-time-based-timetable"], + params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")}, ) as dag8: BashOperator( outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")], task_id="conditional_dataset_and_time_based_timetable", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) From b56299035cc751ad2a63d2c79ee07a00044ddc29 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 23 Mar 2024 23:39:49 +0100 Subject: [PATCH 7/7] Add Newsfragment --- .../apache-airflow/authoring-and-scheduling/datasets.rst | 4 ++++ newsfragments/38432.significant.rst | 9 +++++++++ 2 files changed, 13 insertions(+) create mode 100644 newsfragments/38432.significant.rst diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index f7e2ae0921b70..ebcba4c3b4736 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -233,9 +233,13 @@ If one dataset is updated multiple times before all consumed datasets have been } +.. _datasets:event-extra: + Dataset Event Extra pushed as DAG Run Config -------------------------------------------- +.. versionadded:: 2.10.0 + Tasks producing events via ``outlets`` parameter can define extra information in form of a static dictionary. To make this extra information easy to consume, the information from on or multiple events is merged to a common dictionary per default and provided as :ref:`DAG run configuration ` to the receiving DAG. This means such data can diff --git a/newsfragments/38432.significant.rst b/newsfragments/38432.significant.rst new file mode 100644 index 0000000000000..b2b5c3fd2c6cd --- /dev/null +++ b/newsfragments/38432.significant.rst @@ -0,0 +1,9 @@ +Dataset triggers now provide extra as DAG Run conf. + +With data aware scheduling you can automate triggers between DAGs. passing context information was so far possible via +the name of the Dataset or by passing ``extra`` information to a downstream DAG. Retrieving the ``extra`` context +was only possible via the ``triggering_dataset_events`` context object and iterating over the details. With the new +version of Airflow the ``extra`` is merged into a dictionary and passed to the DAG directly as DAG Run conf so that +DAG params can directly be used. + +See :ref:`Dataset Event Extra pushed as DAG Run Config ` for more information.