Skip to content

Conversation

@amoghrajesh
Copy link
Contributor


Was generative AI tooling used to co-author this PR?
  • No

Why?

Lineage collection is a task execution concern and on checking it only runs on workers (task sdk consumer) processes, not in any server components (scheduler, api server). I intend to move lineage module from airflow-core to task-sdk as part of the ongoing client server separation work.

Some more context:

  • io/path.py intercepts file I/O during task execution
  • OpenLineage listeners run in the worker process after task completion
  • It only orchestrates, never executes user code

What is done?

  • Created a new module in sdk: sdk/lineage.py
  • Moved all classes from airflow.lineage.hook
    • Updated imports to SDK equivalents:
      • ProvidersManagerProvidersManagerTaskRuntime
      • airflow.utils.log.logging_mixinairflow.sdk.definitions._internal.logging_mixin
    • Created get_hook_lineage_readers_plugins() using SDK's plugin discovery

Backward Compatibility

  1. For core -

    • Supports both from airflow.lineage.hook import X and from airflow.lineage import hook
  2. Provider compatibility has been handled with providers/common/compat/src/airflow/providers/common/compat/sdk.py

    • Added lineage classes to compat layer
    • Handles DatasetLineageInfoAssetLineageInfo rename (AF2 → AF3)
  3. Removed from core -airflow-core/src/airflow/plugins_manager.py

    • Deleted unused get_hook_lineage_readers_plugins() function
    • Function only existed for core's old lineage implementation
  4. For provider developers, it is recommended to use imports from airflow.providers.common.compat.sdk

Testing

To gain confidence I tried to test a manual e2e scenario for this. Ran breeze with OL integration:
breeze start-airflow --integration openlineage

DAG:

from __future__ import annotations

from datetime import datetime

from airflow.sdk import DAG
from airflow.hooks.base import BaseHook
from airflow.providers.standard.operators.python import PythonOperator


from airflow.sdk.lineage import get_hook_lineage_collector


class SimpleHook(BaseHook):

    def process(self):
        collector = get_hook_lineage_collector()

        collector.add_input_asset(self, uri="file:///input/data.csv")

        collector.add_output_asset(self, uri="file:///output/result.csv")

        print("Lineage reported! "*10)


def my_task():
    hook = SimpleHook()
    hook.process()


with DAG(
    dag_id="simple_lineage",
    start_date=datetime(2021, 1, 1),
    schedule=None,
    catchup=False,
) as dag:
    PythonOperator(
        task_id="run_hook",
        python_callable=my_task,
    )

Its a simple dag that does this:

  • Creates a custom hook (SimpleHook) that reports lineage information
  • Reports input dataset: file:///input/data.csv
  • Reports output dataset: file:///output/result.csv
  • Sends lineage to OpenLineage by using get_hook_lineage_collector() to register assets

DAG run:
image

Marquez:
image

image image

@amoghrajesh amoghrajesh requested a review from uranusjr January 23, 2026 10:07
@amoghrajesh amoghrajesh self-assigned this Jan 23, 2026
@amoghrajesh amoghrajesh added the full tests needed We need to run full set of tests for this PR to merge label Jan 23, 2026
@amoghrajesh amoghrajesh reopened this Jan 23, 2026
Copy link
Contributor

@mobuchowski mobuchowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks good to me, thanks @amoghrajesh!
Couple of tests to fix though.

Comment on lines +24 to +25
from airflow.sdk import BaseHook, plugins_manager
from airflow.sdk.definitions.asset import Asset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from airflow.sdk import BaseHook, plugins_manager
from airflow.sdk.definitions.asset import Asset
from airflow.sdk import Asset, BaseHook, plugins_manager

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:dev-tools area:lineage area:plugins area:providers area:task-sdk backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch full tests needed We need to run full set of tests for this PR to merge provider:common-compat provider:openlineage AIP-53

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants