Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ class Dataset(os.PathLike, BaseDatasetEventInput):
validator=[attr.validators.min_len(1), attr.validators.max_len(3000)],
)
extra: dict[str, Any] | None = None
"""Static extra dictionary to be passed with event."""
extra_from_return: bool = False
"""Use task return value as extra value."""

__version__: ClassVar[int] = 1

Expand Down
33 changes: 24 additions & 9 deletions airflow/example_dags/example_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,24 @@

from __future__ import annotations

import random

import pendulum

from airflow.datasets import Dataset
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable

# [START dataset_def]
dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"})
# [END dataset_def]
dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"})
# [START dataset_def_extra_return]
dag2_dataset = Dataset("s3://dag2/output_1.txt", extra_from_return=True)
# [END dataset_def_extra_return]
dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"})

with DAG(
Expand All @@ -84,7 +90,11 @@
schedule=None,
tags=["produces", "dataset-scheduled"],
) as dag2:
BashOperator(outlets=[dag2_dataset], task_id="producing_task_2", bash_command="sleep 5")

def some_python_callable():
return {"sleep_seconds": random.randint(1, 10), "some_context": "dynamic data 123"}

PythonOperator(python_callable=some_python_callable, outlets=[dag2_dataset], task_id="producing_task_2")

# [START dag_dep]
with DAG(
Expand All @@ -107,11 +117,12 @@
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[dag1_dataset, dag2_dataset],
tags=["consumes", "dataset-scheduled"],
params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")},
) as dag4:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consuming_2",
bash_command="sleep 5",
bash_command="sleep {{ params.sleep_seconds }}",
)

with DAG(
Expand All @@ -127,7 +138,7 @@
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consuming_3",
bash_command="sleep 5",
bash_command="sleep {{ params.sleep_seconds }}",
)

with DAG(
Expand All @@ -143,38 +154,41 @@
BashOperator(
task_id="unrelated_task",
outlets=[Dataset("s3://unrelated_task/dataset_other_unknown.txt")],
bash_command="sleep 5",
bash_command="sleep {{ params.sleep_seconds }}",
)

with DAG(
dag_id="consume_1_and_2_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset & dag2_dataset),
params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")},
) as dag5:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_and_2_with_dataset_expressions",
bash_command="sleep 5",
bash_command="sleep {{ params.sleep_seconds }}",
)
with DAG(
dag_id="consume_1_or_2_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | dag2_dataset),
params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")},
) as dag6:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_or_2_with_dataset_expressions",
bash_command="sleep 5",
bash_command="sleep {{ params.sleep_seconds }}",
)
with DAG(
dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")},
) as dag7:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_or_both_2_and_3_with_dataset_expressions",
bash_command="sleep 5",
bash_command="sleep {{ params.sleep_seconds }}",
)
with DAG(
dag_id="conditional_dataset_and_time_based_timetable",
Expand All @@ -184,9 +198,10 @@
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
),
tags=["dataset-time-based-timetable"],
params={"sleep_seconds": Param(5, "A random number to sleep. Set by dataset 2 event data.")},
) as dag8:
BashOperator(
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
task_id="conditional_dataset_and_time_based_timetable",
bash_command="sleep 5",
bash_command="sleep {{ params.sleep_seconds }}",
)
8 changes: 7 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ def _create_dag_runs_dataset_triggered(
if previous_dag_run:
dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date)

dataset_events = session.scalars(
dataset_events: Collection[DatasetEvent] = session.scalars(
select(DatasetEvent)
.join(
DagScheduleDatasetReference,
Expand All @@ -1282,9 +1282,15 @@ def _create_dag_runs_dataset_triggered(
events=dataset_events,
)

run_conf = {}
for event in dataset_events:
if event.extra:
run_conf.update(event.extra)
Comment on lines +1285 to +1288

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer we put this in a separate PR to discuss. It’s not entirely obvious how extras should be merged from different events that trigger the run.


dag_run = dag.create_dagrun(
run_id=run_id,
run_type=DagRunType.DATASET_TRIGGERED,
conf=run_conf,
execution_date=exec_date,
data_interval=data_interval,
state=DagRunState.QUEUED,
Expand Down
20 changes: 16 additions & 4 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2425,6 +2425,7 @@ def _run_raw_task(
self.test_mode = test_mode
self.refresh_from_task(self.task, pool_override=pool)
self.refresh_from_db(session=session)
result = None

self.job_id = job_id
self.hostname = get_hostname()
Expand Down Expand Up @@ -2455,7 +2456,7 @@ def _run_raw_task(

try:
if not mark_success:
self._execute_task_with_callbacks(context, test_mode, session=session)
result = self._execute_task_with_callbacks(context, test_mode, session=session)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It seems that not all operators return a result on execution (example: DatabricksSubmitRunOperator)
  2. Some operators do return a result - like (GlueOperator, EmrServerlessStartJobOperator), but the return value is job_id in these scenarios.

Questions -

  1. If any of the above operators publish an Airflow dataset, how to specify extra dictionary in the corresponding dataset?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding 1+2: Yes, if no result is generated, the content will be just None. If it is a scalar like a job_id it is rather a string. So in such cases the output is not usable for passing along to the dataset event.

Regarding your question 1: The publish of the event actually is happening after this line of code. The change was attemping to capture the result to put is as extra a few lines of code later from here.

if not test_mode:
self.refresh_from_db(lock_for_update=True, session=session)
self.state = TaskInstanceState.SUCCESS
Expand Down Expand Up @@ -2543,7 +2544,7 @@ def _run_raw_task(
session.add(Log(self.state, self))
session.merge(self).task = self.task
if self.state == TaskInstanceState.SUCCESS:
self._register_dataset_changes(session=session)
self._register_dataset_changes(result, session=session)

session.commit()
if self.state == TaskInstanceState.SUCCESS:
Expand All @@ -2553,21 +2554,30 @@ def _run_raw_task(

return None

def _register_dataset_changes(self, *, session: Session) -> None:
def _register_dataset_changes(self, result: Any, *, session: Session) -> None:
if TYPE_CHECKING:
assert self.task

for obj in self.task.outlets or []:
self.log.debug("outlet obj %s", obj)
# Lineage can have other types of objects besides datasets
if isinstance(obj, Dataset):
if obj.extra:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge the event static information with dynamic information?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we should not :-(
I (previously) interpreted the extrafield as being dynamic data. But as it is (actually) intended to be meta data for the dataset itself I mis-interpreted this as an option to pass extra data for this "reference" to a Dataset (interpreted by the URI).

extra = obj.extra
Comment on lines +2565 to +2566

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this is right. It’s made quite clear in previous issues that Dataset.extras and DatasetEvent.extras are different things and should be kept separate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, then it seems I mis-understood your comments in the previous PR7discussions about this. Thanks for clearly documenting the differences between Dataset and DatasetEvent(both) extra details in #38481. This opened my eyes and now I understand your push-back.
Now it is "clear" to me. Intent was not to overwrite or mangle "static" information for events.

elif obj.extra_from_return:
extra = result if isinstance(result, dict) else {str(self.task_id): result}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automatically putting the value under a key is too magical to me. I would prefer this either just forward the value (the extra field is capable of storing any JSON-able values, after all), or skip the value entirely if it has an unexpected type. The task ID is also not a particularly obvious key either.

else:
extra = None
dataset_manager.register_dataset_change(
task_instance=self,
dataset=obj,
extra=extra,
session=session,
)

def _execute_task_with_callbacks(self, context: Context, test_mode: bool = False, *, session: Session):
def _execute_task_with_callbacks(
self, context: Context, test_mode: bool = False, *, session: Session
) -> Any:
"""Prepare Task for Execution."""
from airflow.models.renderedtifields import RenderedTaskInstanceFields

Expand Down Expand Up @@ -2658,6 +2668,8 @@ def signal_handler(signum, frame):
Stats.incr("operator_successes", tags={**self.stats_tags, "task_type": self.task.task_type})
Stats.incr("ti_successes", tags=self.stats_tags)

return result

def _execute_task(self, context, task_orig):
"""
Execute Task (optionally with a Timeout) and push Xcom results.
Expand Down
24 changes: 24 additions & 0 deletions docs/apache-airflow/authoring-and-scheduling/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ This extra information does not affect a dataset's identity. This means a DAG wi
...,
)

In cases the published extra information should be dynamic, you can also set the ``extra_from_return`` to True. In this case the return value of your executing task will be used as extra information to be published:

.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
:language: python
:start-after: [START dataset_def_extra_return]
:end-before: [END dataset_def_extra_return]

The extra information is expected to be a dictionary, if the task is not returning a dictionary, then the value is pushed into a dictionary with the task id as key.

.. note:: **Security Note:** Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in dataset URIs or extra key values!

How to use datasets in your DAGs
Expand Down Expand Up @@ -224,6 +233,21 @@ If one dataset is updated multiple times before all consumed datasets have been

}

.. _datasets:event-extra:

Dataset Event Extra pushed as DAG Run Config
--------------------------------------------

.. versionadded:: 2.10.0

Tasks producing events via ``outlets`` parameter can define extra information in form of a static dictionary.
To make this extra information easy to consume, the information from on or multiple events is merged to a common dictionary
per default and provided as :ref:`DAG run configuration <dagrun:parameters>` to the receiving DAG. This means such data can
directly be used by consuming :ref:`DAGs as parameter <concepts:params>`.

As during the merge of extra information some information might be over-ridden you either need to use unique keys or use the
``triggering_dataset_events`` as described below.

Fetching information from a Triggering Dataset Event
----------------------------------------------------

Expand Down
9 changes: 9 additions & 0 deletions newsfragments/38432.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Dataset triggers now provide extra as DAG Run conf.

With data aware scheduling you can automate triggers between DAGs. passing context information was so far possible via
the name of the Dataset or by passing ``extra`` information to a downstream DAG. Retrieving the ``extra`` context
was only possible via the ``triggering_dataset_events`` context object and iterating over the details. With the new
version of Airflow the ``extra`` is merged into a dictionary and passed to the DAG directly as DAG Run conf so that
DAG params can directly be used.

See :ref:`Dataset Event Extra pushed as DAG Run Config <datasets:event-extra>` for more information.