Skip to content

feat: Add JobDependenciesRunFacet to asset-triggered OL DAG events#59521

Merged
mobuchowski merged 1 commit into
apache:mainfrom
kacpermuda:feat-ol-asset-events
Jan 9, 2026
Merged

feat: Add JobDependenciesRunFacet to asset-triggered OL DAG events#59521
mobuchowski merged 1 commit into
apache:mainfrom
kacpermuda:feat-ol-asset-events

Conversation

@kacpermuda
Copy link
Copy Markdown
Collaborator

@kacpermuda kacpermuda commented Dec 16, 2025

When DagRun is triggered by Asset Events, OL has no visibility into what were the emitting jobs. This PR adds JobDependenciesRunFacet to DAG events, with information about this connection. Now OL consumers will be able to track exactly from which runs the asset events are coming from, improving RCA dramatically.

So when DAG A is scheduled on 2 Assets: ASSET1 and ASSET2, when those two asset are produced by tasks in other dags, the OL DAG event for DAG A, will have the information what were the Asset Events consumed by this dagrun, and from which exact task runs and dagruns of other DAGs they came from (with OL run id of those task runs that produced assets as well !).

I've also added a possibility to create a JobDependency from AssetEvent.extra, so that users who emit Asset Events manually via the API can also attach OpenLineage job information that will be included in the facet.

Example 1:

In that scenario, we have two dags that are correlated together through an Asset. Currently Ol even have information about dag schedule and about task outlets, but that's all it has - no information about actual run correlation, only job level.

from airflow import DAG, Asset
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime

sample_dataset = Asset("s3://my-bucket/sample-data.csv")
sample_dataset2 = Asset("gs://my-bucket/another_data.csv")

with DAG(
    dag_id="dag_asset_producer",
    schedule=None,
    catchup=False,
    start_date=datetime(2024, 1, 1)
) as dag:
    create_dataset_task = BashOperator(
        task_id="create_dataset",
        bash_command="sleep 1;",
        outlets=[sample_dataset],
    )

with DAG(
    dag_id="dag_asset_consumer2",
    schedule=[sample_dataset],
    catchup=False,
    start_date=datetime(2024, 1, 1)
) as consumer_dag2:

    process_data_task2 = BashOperator(
        task_id="process_dataset2",
        bash_command="sleep 1;",
    )

After this PR, the DAG events for dag_asset_consumer2, would contain a facet like:

      "jobDependencies": {
        "_producer": "https://github.com/apache/airflow/tree/providers-openlineage/2.9.1",
        "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/JobDependenciesRunFacet.json#/$defs/JobDependenciesRunFacet",
        "downstream": [],
        "upstream": [
          {
            "airflow": {
              "asset_events": [
                {
                  "asset_event_id": 1,
                  "asset_id": 2,
                  "asset_uri": "s3://my-bucket/sample-data.csv",
                  "dag_run_id": "manual__2025-12-17T18:46:43.457674+00:00"
                }
              ]
            },
            "dependency_type": "IMPLICIT_ASSET_DEPENDENCY",
            "job": {
              "name": "dag_asset_producer.create_dataset",
              "namespace": "default"
            },
            "run": {
              "runId": "019b2da2-d001-701f-b2c9-efbaf7c5fb72"
            }
          }
        ]
      },

So we have a clear information about the dependency between jobs, OL job / run identifiers and even some metadata about the AssetEvents consumed with Airflow specific information like DagRunID.

Example 2:

I think the real power of this features shows up when dag gets triggered on multiple assets, usually coming from dags running on different schedules. This is an example produced by the below code.

I've run the first asset producer twice, and it wasn't enough to trigger the consumer that also requires second asset. After running second asset producer, we can see the information about all consumed assets and respective OL job/run information for all 3 runs.

from airflow import DAG, Dataset as Asset
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime

sample_dataset = Asset("s3://my-bucket/sample-data.csv")
sample_dataset2 = Asset("gs://my-bucket/another_data.csv")


with DAG(
    dag_id="dag_asset_producer",
    schedule=None,
    catchup=False,
    start_date=datetime(2024, 1, 1)
) as dag:
    create_dataset_task = BashOperator(
        task_id="create_dataset",
        bash_command="sleep 1;",
        outlets=[sample_dataset],
    )

with DAG(
    dag_id="dag_asset_producer2",
    schedule=None,
    catchup=False,
    start_date=datetime(2024, 1, 1)
) as dag2:
    create_dataset_task2 = BashOperator(
        task_id="create_dataset",
        bash_command="sleep 1;",
        outlets=[sample_dataset2],
    )

with DAG(
    dag_id="dag_asset_consumer",
    schedule=[sample_dataset, sample_dataset2],
    catchup=False,
    start_date=datetime(2024, 1, 1)
) as consumer_dag:

    process_data_task = BashOperator(
        task_id="process_dataset",
        bash_command="sleep 1;",
    )
"jobDependencies": {
        "_producer": "https://github.com/apache/airflow/tree/providers-openlineage/2.9.1",
        "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/JobDependenciesRunFacet.json#/$defs/JobDependenciesRunFacet",
        "downstream": [],
        "upstream": [
          {
            "airflow": {
              "asset_events": [
                {
                  "asset_event_id": 5,
                  "asset_id": 2,
                  "asset_uri": "s3://my-bucket/sample-data.csv",
                  "dag_run_id": "manual__2025-12-17T19:05:05.506294+00:00"
                }
              ]
            },
            "dependency_type": "IMPLICIT_ASSET_DEPENDENCY",
            "job": {
              "name": "dag_asset_producer.create_dataset",
              "namespace": "default"
            },
            "run": {
              "runId": "019b2db3-a0e2-707e-a544-85f54e8b1e71"
            }
          },
          {
            "airflow": {
              "asset_events": [
                {
                  "asset_event_id": 6,
                  "asset_id": 2,
                  "asset_uri": "s3://my-bucket/sample-data.csv",
                  "dag_run_id": "manual__2025-12-17T19:05:12.288534+00:00"
                }
              ]
            },
            "dependency_type": "IMPLICIT_ASSET_DEPENDENCY",
            "job": {
              "name": "dag_asset_producer.create_dataset",
              "namespace": "default"
            },
            "run": {
              "runId": "019b2db3-bb60-7086-90bf-4d1db0ee3b1f"
            }
          },
          {
            "airflow": {
              "asset_events": [
                {
                  "asset_event_id": 7,
                  "asset_id": 1,
                  "asset_uri": "gs://my-bucket/another_data.csv",
                  "dag_run_id": "manual__2025-12-17T19:05:19.196364+00:00"
                }
              ]
            },
            "dependency_type": "IMPLICIT_ASSET_DEPENDENCY",
            "job": {
              "name": "dag_asset_producer2.create_dataset",
              "namespace": "default"
            },
            "run": {
              "runId": "019b2db3-d65c-79f0-97c3-9cdacc9468c7"
            }
          }
        ]
      }

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@kacpermuda kacpermuda force-pushed the feat-ol-asset-events branch 2 times, most recently from 6f450b1 to d413959 Compare December 17, 2025 18:58
@kacpermuda kacpermuda marked this pull request as ready for review December 17, 2025 18:58
@kacpermuda kacpermuda force-pushed the feat-ol-asset-events branch 2 times, most recently from a880546 to dc5ca00 Compare December 18, 2025 14:34
@kacpermuda kacpermuda force-pushed the feat-ol-asset-events branch 2 times, most recently from c5b06bf to 5c33850 Compare January 7, 2026 14:15
@kacpermuda kacpermuda force-pushed the feat-ol-asset-events branch from 5c33850 to a7a87bb Compare January 8, 2026 14:14
@mobuchowski mobuchowski merged commit 1950114 into apache:main Jan 9, 2026
127 checks passed
jason810496 pushed a commit to jason810496/airflow that referenced this pull request Jan 22, 2026
jhgoebbert pushed a commit to jhgoebbert/airflow_Owen-CH-Leung that referenced this pull request Feb 8, 2026
Subham-KRLX pushed a commit to Subham-KRLX/airflow that referenced this pull request Mar 4, 2026
Ankurdeewan pushed a commit to Ankurdeewan/airflow that referenced this pull request Mar 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants