diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 2507c69d01b43..e853c0808428a 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -114,6 +114,9 @@ class Dataset(os.PathLike, BaseDatasetEventInput): validator=[attr.validators.min_len(1), attr.validators.max_len(3000)], ) extra: dict[str, Any] | None = None + """Static extra dictionary to be passed with event.""" + extra_from_return: bool = False + """Use task return value as extra value.""" __version__: ClassVar[int] = 1 diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py index 54f15d8a2d802..999447aea1814 100644 --- a/airflow/example_dags/example_datasets.py +++ b/airflow/example_dags/example_datasets.py @@ -52,18 +52,24 @@ from __future__ import annotations +import random + import pendulum from airflow.datasets import Dataset from airflow.models.dag import DAG +from airflow.models.param import Param from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator from airflow.timetables.datasets import DatasetOrTimeSchedule from airflow.timetables.trigger import CronTriggerTimetable # [START dataset_def] dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"}) # [END dataset_def] -dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"}) +# [START dataset_def_extra_return] +dag2_dataset = Dataset("s3://dag2/output_1.txt", extra_from_return=True) +# [END dataset_def_extra_return] dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"}) with DAG( @@ -84,7 +90,11 @@ schedule=None, tags=["produces", "dataset-scheduled"], ) as dag2: - BashOperator(outlets=[dag2_dataset], task_id="producing_task_2", bash_command="sleep 5") + + def some_python_callable(): + return {"sleep_seconds": random.randint(1, 10), "some_context": "dynamic data 123"} + + PythonOperator(python_callable=some_python_callable, outlets=[dag2_dataset], task_id="producing_task_2") # [START dag_dep] with DAG( @@ -107,11 +117,12 @@ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=[dag1_dataset, dag2_dataset], tags=["consumes", "dataset-scheduled"], + params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")}, ) as dag4: BashOperator( outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], task_id="consuming_2", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( @@ -127,7 +138,7 @@ BashOperator( outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], task_id="consuming_3", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( @@ -143,38 +154,41 @@ BashOperator( task_id="unrelated_task", outlets=[Dataset("s3://unrelated_task/dataset_other_unknown.txt")], - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( dag_id="consume_1_and_2_with_dataset_expressions", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=(dag1_dataset & dag2_dataset), + params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")}, ) as dag5: BashOperator( outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], task_id="consume_1_and_2_with_dataset_expressions", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( dag_id="consume_1_or_2_with_dataset_expressions", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=(dag1_dataset | dag2_dataset), + params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")}, ) as dag6: BashOperator( outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], task_id="consume_1_or_2_with_dataset_expressions", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( dag_id="consume_1_or_both_2_and_3_with_dataset_expressions", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)), + params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")}, ) as dag7: BashOperator( outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], task_id="consume_1_or_both_2_and_3_with_dataset_expressions", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) with DAG( dag_id="conditional_dataset_and_time_based_timetable", @@ -184,9 +198,10 @@ timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset) ), tags=["dataset-time-based-timetable"], + params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")}, ) as dag8: BashOperator( outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")], task_id="conditional_dataset_and_time_based_timetable", - bash_command="sleep 5", + bash_command="sleep {{ params.sleep_seconds }}", ) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 0596e7f59f25b..d0b8faff30e81 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1264,7 +1264,7 @@ def _create_dag_runs_dataset_triggered( if previous_dag_run: dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date) - dataset_events = session.scalars( + dataset_events: Collection[DatasetEvent] = session.scalars( select(DatasetEvent) .join( DagScheduleDatasetReference, @@ -1282,9 +1282,15 @@ def _create_dag_runs_dataset_triggered( events=dataset_events, ) + run_conf = {} + for event in dataset_events: + if event.extra: + run_conf.update(event.extra) + dag_run = dag.create_dagrun( run_id=run_id, run_type=DagRunType.DATASET_TRIGGERED, + conf=run_conf, execution_date=exec_date, data_interval=data_interval, state=DagRunState.QUEUED, diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 696ee98d6f439..9eb859ceb9fe7 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2425,6 +2425,7 @@ def _run_raw_task( self.test_mode = test_mode self.refresh_from_task(self.task, pool_override=pool) self.refresh_from_db(session=session) + result = None self.job_id = job_id self.hostname = get_hostname() @@ -2455,7 +2456,7 @@ def _run_raw_task( try: if not mark_success: - self._execute_task_with_callbacks(context, test_mode, session=session) + result = self._execute_task_with_callbacks(context, test_mode, session=session) if not test_mode: self.refresh_from_db(lock_for_update=True, session=session) self.state = TaskInstanceState.SUCCESS @@ -2543,7 +2544,7 @@ def _run_raw_task( session.add(Log(self.state, self)) session.merge(self).task = self.task if self.state == TaskInstanceState.SUCCESS: - self._register_dataset_changes(session=session) + self._register_dataset_changes(result, session=session) session.commit() if self.state == TaskInstanceState.SUCCESS: @@ -2553,7 +2554,7 @@ def _run_raw_task( return None - def _register_dataset_changes(self, *, session: Session) -> None: + def _register_dataset_changes(self, result: Any, *, session: Session) -> None: if TYPE_CHECKING: assert self.task @@ -2561,13 +2562,22 @@ def _register_dataset_changes(self, *, session: Session) -> None: self.log.debug("outlet obj %s", obj) # Lineage can have other types of objects besides datasets if isinstance(obj, Dataset): + if obj.extra: + extra = obj.extra + elif obj.extra_from_return: + extra = result if isinstance(result, dict) else {str(self.task_id): result} + else: + extra = None dataset_manager.register_dataset_change( task_instance=self, dataset=obj, + extra=extra, session=session, ) - def _execute_task_with_callbacks(self, context: Context, test_mode: bool = False, *, session: Session): + def _execute_task_with_callbacks( + self, context: Context, test_mode: bool = False, *, session: Session + ) -> Any: """Prepare Task for Execution.""" from airflow.models.renderedtifields import RenderedTaskInstanceFields @@ -2658,6 +2668,8 @@ def signal_handler(signum, frame): Stats.incr("operator_successes", tags={**self.stats_tags, "task_type": self.task.task_type}) Stats.incr("ti_successes", tags=self.stats_tags) + return result + def _execute_task(self, context, task_orig): """ Execute Task (optionally with a Timeout) and push Xcom results. diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index 354a69ff7edb0..ebcba4c3b4736 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -128,6 +128,15 @@ This extra information does not affect a dataset's identity. This means a DAG wi ..., ) +In cases the published extra information should be dynamic, you can also set the ``extra_from_return`` to True. In this case the return value of your executing task will be used as extra information to be published: + +.. exampleinclude:: /../../airflow/example_dags/example_datasets.py + :language: python + :start-after: [START dataset_def_extra_return] + :end-before: [END dataset_def_extra_return] + +The extra information is expected to be a dictionary, if the task is not returning a dictionary, then the value is pushed into a dictionary with the task id as key. + .. note:: **Security Note:** Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in dataset URIs or extra key values! How to use datasets in your DAGs @@ -224,6 +233,21 @@ If one dataset is updated multiple times before all consumed datasets have been } +.. _datasets:event-extra: + +Dataset Event Extra pushed as DAG Run Config +-------------------------------------------- + +.. versionadded:: 2.10.0 + +Tasks producing events via ``outlets`` parameter can define extra information in form of a static dictionary. +To make this extra information easy to consume, the information from on or multiple events is merged to a common dictionary +per default and provided as :ref:`DAG run configuration ` to the receiving DAG. This means such data can +directly be used by consuming :ref:`DAGs as parameter `. + +As during the merge of extra information some information might be over-ridden you either need to use unique keys or use the +``triggering_dataset_events`` as described below. + Fetching information from a Triggering Dataset Event ---------------------------------------------------- diff --git a/newsfragments/38432.significant.rst b/newsfragments/38432.significant.rst new file mode 100644 index 0000000000000..b2b5c3fd2c6cd --- /dev/null +++ b/newsfragments/38432.significant.rst @@ -0,0 +1,9 @@ +Dataset triggers now provide extra as DAG Run conf. + +With data aware scheduling you can automate triggers between DAGs. passing context information was so far possible via +the name of the Dataset or by passing ``extra`` information to a downstream DAG. Retrieving the ``extra`` context +was only possible via the ``triggering_dataset_events`` context object and iterating over the details. With the new +version of Airflow the ``extra`` is merged into a dictionary and passed to the DAG directly as DAG Run conf so that +DAG params can directly be used. + +See :ref:`Dataset Event Extra pushed as DAG Run Config ` for more information.