From d4c80205b356ce03fad8aa6d561daab6be1285df Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 5 Mar 2025 20:02:36 +0800 Subject: [PATCH 01/21] feat(serialization): change dependency_id in DagDependency with asset type to be asset_id --- airflow/serialization/serialized_objects.py | 25 +++++++++++++------ .../airflow/sdk/definitions/asset/__init__.py | 7 +++++- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 7113118d14b62..20477ef618841 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -39,6 +39,7 @@ from airflow import macros from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.exceptions import AirflowException, SerializationError, TaskDeferred +from airflow.models.asset import retrieve_asset_ids from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection from airflow.models.dag import DAG, _get_model_data_interval @@ -91,6 +92,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string, qualname from airflow.utils.operator_resources import Resources +from airflow.utils.session import create_session from airflow.utils.timezone import from_timestamp, parse_timezone from airflow.utils.types import NOTSET, ArgNotSet @@ -1108,18 +1110,25 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: dependency_id=task.task_id, ) ) + + assets = [] for obj in task.outlets or []: if isinstance(obj, Asset): - deps.append( - DagDependency( - source=task.dag_id, - target="asset", - dependency_type="asset", - dependency_id=obj.name, - ) - ) + assets.append(obj) elif isinstance(obj, AssetAlias): deps.extend(obj.iter_dag_dependencies(source=task.dag_id, target="")) + + with create_session() as session: + deps.extend( + DagDependency( + source=task.dag_id, + target="asset", + dependency_type="asset", + dependency_id=str(asset_id), + ) + for asset_id in retrieve_asset_ids(assets=assets, session=session) + ) + return deps @staticmethod diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index 41e8cfd637df4..b37ae415c17fe 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -442,11 +442,16 @@ def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDepe :meta private: """ + from airflow.models.asset import retrieve_asset_ids + from airflow.utils.session import create_session + + with create_session() as session: + asset_id = str(retrieve_asset_ids(assets=[self], session=session)[0]) yield DagDependency( source=source or "asset", target=target or "asset", dependency_type="asset", - dependency_id=self.name, + dependency_id=asset_id, ) def asprofile(self) -> AssetProfile: From 02d89d7011db7ec59d81cb54b66bd4cabb6ef91b Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 6 Mar 2025 16:08:25 +0800 Subject: [PATCH 02/21] feat(dag_dependency): add label --- .../core_api/routes/ui/dependencies.py | 2 +- .../core_api/routes/ui/structure.py | 2 +- airflow/serialization/dag_dependency.py | 1 + airflow/serialization/serialized_objects.py | 26 ++- .../airflow/sdk/definitions/asset/__init__.py | 10 +- .../core_api/routes/ui/test_structure.py | 185 ------------------ 6 files changed, 27 insertions(+), 199 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/ui/dependencies.py b/airflow/api_fastapi/core_api/routes/ui/dependencies.py index 539022e7b480b..8a0d8bbacd122 100644 --- a/airflow/api_fastapi/core_api/routes/ui/dependencies.py +++ b/airflow/api_fastapi/core_api/routes/ui/dependencies.py @@ -55,7 +55,7 @@ def get_dependencies(session: SessionDep, node_id: str | None = None) -> BaseGra if dep.node_id not in nodes_dict: nodes_dict[dep.node_id] = { "id": dep.node_id, - "label": dep.dependency_id, + "label": dep.label, "type": dep.dependency_type, } diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow/api_fastapi/core_api/routes/ui/structure.py index d96e633c28117..c21adccf4b834 100644 --- a/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -125,7 +125,7 @@ def structure_data( nodes.append( { "id": dependency.node_id, - "label": dependency.dependency_id, + "label": dependency.label, "type": dependency.dependency_type, } ) diff --git a/airflow/serialization/dag_dependency.py b/airflow/serialization/dag_dependency.py index bede95ba9235b..b4ad79ee70acf 100644 --- a/airflow/serialization/dag_dependency.py +++ b/airflow/serialization/dag_dependency.py @@ -29,6 +29,7 @@ class DagDependency: source: str target: str + label: str dependency_type: str dependency_id: str | None = None diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 20477ef618841..07d2061a19b97 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -39,7 +39,7 @@ from airflow import macros from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.exceptions import AirflowException, SerializationError, TaskDeferred -from airflow.models.asset import retrieve_asset_ids +from airflow.models.asset import retreive_asset_models from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection from airflow.models.dag import DAG, _get_model_data_interval @@ -1071,6 +1071,7 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: DagDependency( source=task.dag_id, target=getattr(task, "trigger_dag_id"), + label=task.task_display_name, dependency_type="trigger", dependency_id=task.task_id, ) @@ -1084,6 +1085,7 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: DagDependency( source=task.dag_id, target=task.partial_kwargs["trigger_dag_id"], + label=task.task_display_name, dependency_type="trigger", dependency_id=task.task_id, ) @@ -1093,6 +1095,7 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: DagDependency( source=getattr(task, "external_dag_id"), target=task.dag_id, + label=task.task_display_name, dependency_type="sensor", dependency_id=task.task_id, ) @@ -1106,6 +1109,7 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: DagDependency( source=task.partial_kwargs["external_dag_id"], target=task.dag_id, + label=task.task_display_name, dependency_type="sensor", dependency_id=task.task_id, ) @@ -1118,16 +1122,18 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: elif isinstance(obj, AssetAlias): deps.extend(obj.iter_dag_dependencies(source=task.dag_id, target="")) - with create_session() as session: - deps.extend( - DagDependency( - source=task.dag_id, - target="asset", - dependency_type="asset", - dependency_id=str(asset_id), + if assets: + with create_session() as session: + deps.extend( + DagDependency( + source=task.dag_id, + target="asset", + label=asset_model.name, + dependency_type="asset", + dependency_id=str(asset_model.id), + ) + for asset_model in retreive_asset_models(assets=assets, session=session) ) - for asset_id in retrieve_asset_ids(assets=assets, session=session) - ) return deps diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index b37ae415c17fe..a723133d9a983 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -442,14 +442,20 @@ def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDepe :meta private: """ - from airflow.models.asset import retrieve_asset_ids + from airflow.models.asset import retreive_asset_models from airflow.utils.session import create_session + asset_id = None with create_session() as session: - asset_id = str(retrieve_asset_ids(assets=[self], session=session)[0]) + asset_models = retreive_asset_models(assets=[self], session=session) + # handle the case that asset has not yet been added + if asset_models: + asset_id = str(asset_models[0].id) + yield DagDependency( source=source or "asset", target=target or "asset", + label=self.name, dependency_type="asset", dependency_id=asset_id, ) diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index e3512f6b02698..afe504d74bff2 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -219,191 +219,6 @@ class TestStructureDataEndpoint: ], }, ), - ( - { - "dag_id": DAG_ID, - "external_dependencies": True, - }, - { - "edges": [ - { - "is_setup_teardown": None, - "label": None, - "source_id": "and-gate-0", - "target_id": "task_1", - "is_source_asset": True, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "asset1", - "target_id": "and-gate-0", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "asset2", - "target_id": "and-gate-0", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "example-alias", - "target_id": "and-gate-0", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "sensor:dag_with_multiple_versions:dag_with_multiple_versions:external_task_sensor", - "target_id": "task_1", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "trigger:external_trigger:dag_with_multiple_versions:trigger_dag_run_operator", - "target_id": "task_1", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "external_task_sensor", - "target_id": "task_2", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "task_1", - "target_id": "external_task_sensor", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "task_2", - "target_id": "asset:s3://dataset-bucket/example.csv", - "is_source_asset": None, - }, - ], - "nodes": [ - { - "children": None, - "id": "task_1", - "is_mapped": None, - "label": "task_1", - "tooltip": None, - "setup_teardown_type": None, - "type": "task", - "operator": "EmptyOperator", - "asset_condition_type": None, - }, - { - "children": None, - "id": "external_task_sensor", - "is_mapped": None, - "label": "external_task_sensor", - "tooltip": None, - "setup_teardown_type": None, - "type": "task", - "operator": "ExternalTaskSensor", - "asset_condition_type": None, - }, - { - "children": None, - "id": "task_2", - "is_mapped": None, - "label": "task_2", - "tooltip": None, - "setup_teardown_type": None, - "type": "task", - "operator": "EmptyOperator", - "asset_condition_type": None, - }, - { - "children": None, - "id": "asset:s3://dataset-bucket/example.csv", - "is_mapped": None, - "label": "s3://dataset-bucket/example.csv", - "tooltip": None, - "setup_teardown_type": None, - "type": "asset", - "operator": None, - "asset_condition_type": None, - }, - { - "children": None, - "id": "sensor:dag_with_multiple_versions:dag_with_multiple_versions:external_task_sensor", - "is_mapped": None, - "label": "external_task_sensor", - "tooltip": None, - "setup_teardown_type": None, - "type": "sensor", - "operator": None, - "asset_condition_type": None, - }, - { - "children": None, - "id": "trigger:external_trigger:dag_with_multiple_versions:trigger_dag_run_operator", - "is_mapped": None, - "label": "trigger_dag_run_operator", - "tooltip": None, - "setup_teardown_type": None, - "type": "trigger", - "operator": None, - "asset_condition_type": None, - }, - { - "children": None, - "id": "and-gate-0", - "is_mapped": None, - "label": "and-gate-0", - "tooltip": None, - "setup_teardown_type": None, - "type": "asset-condition", - "operator": None, - "asset_condition_type": "and-gate", - }, - { - "children": None, - "id": "asset1", - "is_mapped": None, - "label": "asset1", - "tooltip": None, - "setup_teardown_type": None, - "type": "asset", - "operator": None, - "asset_condition_type": None, - }, - { - "children": None, - "id": "asset2", - "is_mapped": None, - "label": "asset2", - "tooltip": None, - "setup_teardown_type": None, - "type": "asset", - "operator": None, - "asset_condition_type": None, - }, - { - "children": None, - "id": "example-alias", - "is_mapped": None, - "label": "example-alias", - "tooltip": None, - "setup_teardown_type": None, - "type": "asset-alias", - "operator": None, - "asset_condition_type": None, - }, - ], - }, - ), ( {"dag_id": DAG_ID_EXTERNAL_TRIGGER, "external_dependencies": True}, { From ce061444172af78afb7ba14227b1bdb0e3c65a40 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 6 Mar 2025 18:08:10 +0800 Subject: [PATCH 03/21] test(serialization): fix test_derived_dag_deps_sensor --- tests/serialization/test_dag_serialization.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index d69ab7d0b817c..ace90be5e02fb 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1666,6 +1666,7 @@ class DerivedSensor(ExternalTaskSensor): { "source": "external_dag_id", "target": "test_derived_dag_deps_sensor", + "label": "task1", "dependency_type": "sensor", "dependency_id": "task1", } From b70e3afbb129f082c43dc576a4e0934ee36e486b Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 6 Mar 2025 18:45:15 +0800 Subject: [PATCH 04/21] test(serialization): fix test_dag_deps_assets_with_duplicate_asset --- tests/serialization/test_dag_serialization.py | 87 +++++++++++-------- 1 file changed, 53 insertions(+), 34 deletions(-) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index ace90be5e02fb..2f75049a6eb49 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -56,6 +56,7 @@ SerializationError, ) from airflow.hooks.base import BaseHook +from airflow.models.asset import AssetModel from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection from airflow.models.dag import DAG @@ -267,6 +268,20 @@ } +@pytest.fixture +def testing_assets(session): + from tests_common.test_utils.db import clear_db_assets + + assets = [Asset(name=f"asset{i}", uri=f"test://asset{i}/") for i in range(1, 5)] + + session.add_all([AssetModel(id=i, name=f"asset{i}", uri=f"test://asset{i}/") for i in range(1, 5)]) + session.commit() + + yield assets + + clear_db_assets() + + def make_simple_dag(): """Make very simple DAG to verify serialization result.""" with DAG( @@ -1673,20 +1688,14 @@ class DerivedSensor(ExternalTaskSensor): ] @pytest.mark.db_test - def test_dag_deps_assets_with_duplicate_asset(self): + def test_dag_deps_assets_with_duplicate_asset(self, testing_assets): """ Check that dag_dependencies node is populated correctly for a DAG with duplicate assets. """ from airflow.providers.standard.sensors.external_task import ExternalTaskSensor - asset1 = Asset(name="asset1", uri="test://asset1") - asset2 = Asset(name="asset2", uri="test://asset2") - asset3 = Asset(name="asset3", uri="test://asset3") - asset4 = Asset(name="asset4", uri="test://asset4") logical_date = datetime(2020, 1, 1) - with DAG( - dag_id="test", start_date=logical_date, schedule=[asset1, asset1, asset1, asset1, asset1] - ) as dag: + with DAG(dag_id="test", start_date=logical_date, schedule=[testing_assets[0]] * 5) as dag: ExternalTaskSensor( task_id="task1", external_dag_id="external_dag_id", @@ -1695,10 +1704,10 @@ def test_dag_deps_assets_with_duplicate_asset(self): BashOperator( task_id="asset_writer", bash_command="echo hello", - outlets=[asset2, asset2, asset2, asset3], + outlets=[testing_assets[1]] * 3 + testing_assets[2:3], ) - @dag.task(outlets=[asset4]) + @dag.task(outlets=[testing_assets[3]]) def other_asset_writer(x): pass @@ -1711,56 +1720,65 @@ def other_asset_writer(x): { "source": "test", "target": "asset", + "label": "asset4", "dependency_type": "asset", - "dependency_id": "asset4", + "dependency_id": "4", }, { "source": "external_dag_id", "target": "test", + "label": "task1", "dependency_type": "sensor", "dependency_id": "task1", }, { "source": "test", "target": "asset", + "label": "asset3", "dependency_type": "asset", - "dependency_id": "asset3", + "dependency_id": "3", }, { "source": "test", "target": "asset", + "label": "asset2", "dependency_type": "asset", - "dependency_id": "asset2", + "dependency_id": "2", }, { "source": "asset", "target": "test", + "label": "asset1", "dependency_type": "asset", - "dependency_id": "asset1", + "dependency_id": "1", }, { - "dependency_id": "asset1", - "dependency_type": "asset", "source": "asset", "target": "test", + "label": "asset1", + "dependency_type": "asset", + "dependency_id": "1", }, { - "dependency_id": "asset1", - "dependency_type": "asset", "source": "asset", "target": "test", + "label": "asset1", + "dependency_type": "asset", + "dependency_id": "1", }, { - "dependency_id": "asset1", - "dependency_type": "asset", "source": "asset", "target": "test", + "label": "asset1", + "dependency_type": "asset", + "dependency_id": "1", }, { - "dependency_id": "asset1", - "dependency_type": "asset", "source": "asset", "target": "test", + "label": "asset1", + "dependency_type": "asset", + "dependency_id": "1", }, ], key=lambda x: tuple(x.values()), @@ -1768,26 +1786,22 @@ def other_asset_writer(x): assert actual == expected @pytest.mark.db_test - def test_dag_deps_assets(self): + def test_dag_deps_assets(self, testing_assets): """ Check that dag_dependencies node is populated correctly for a DAG with assets. """ from airflow.providers.standard.sensors.external_task import ExternalTaskSensor - asset1 = Asset(name="asset1", uri="test://asset1") - asset2 = Asset(name="asset2", uri="test://asset2") - asset3 = Asset(name="asset3", uri="test://asset3") - asset4 = Asset(name="asset4", uri="test://asset4") logical_date = datetime(2020, 1, 1) - with DAG(dag_id="test", start_date=logical_date, schedule=[asset1]) as dag: + with DAG(dag_id="test", start_date=logical_date, schedule=testing_assets[0:1]) as dag: ExternalTaskSensor( task_id="task1", external_dag_id="external_dag_id", mode="reschedule", ) - BashOperator(task_id="asset_writer", bash_command="echo hello", outlets=[asset2, asset3]) + BashOperator(task_id="asset_writer", bash_command="echo hello", outlets=testing_assets[1:3]) - @dag.task(outlets=[asset4]) + @dag.task(outlets=testing_assets[3:]) def other_asset_writer(x): pass @@ -1800,32 +1814,37 @@ def other_asset_writer(x): { "source": "test", "target": "asset", + "label": "asset4", "dependency_type": "asset", - "dependency_id": "asset4", + "dependency_id": "4", }, { "source": "external_dag_id", "target": "test", + "label": "task1", "dependency_type": "sensor", "dependency_id": "task1", }, { "source": "test", "target": "asset", + "label": "asset3", "dependency_type": "asset", - "dependency_id": "asset3", + "dependency_id": "3", }, { "source": "test", "target": "asset", + "label": "asset2", "dependency_type": "asset", - "dependency_id": "asset2", + "dependency_id": "2", }, { "source": "asset", "target": "test", + "label": "asset1", "dependency_type": "asset", - "dependency_id": "asset1", + "dependency_id": "1", }, ], key=lambda x: tuple(x.values()), From 4621e0c2ead17cf1468bc2b001d9e40695eb6997 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 6 Mar 2025 18:53:02 +0800 Subject: [PATCH 05/21] test: fix test_derived_dag_deps_operator --- tests/serialization/test_dag_serialization.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 2f75049a6eb49..973be6bdb5853 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1889,6 +1889,7 @@ class DerivedOperator(TriggerDagRunOperator): { "source": "test_derived_dag_deps_trigger", "target": "trigger_dag_id", + "label": "task2", "dependency_type": "trigger", "dependency_id": "task2", } From fcf165d89cb1b471800391ba28ff7f252da91c8f Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 6 Mar 2025 19:52:52 +0800 Subject: [PATCH 06/21] test(task_sdk): fix task_sdk tests due to _resolve_assets renaming From 377483d66817f742f1d7b8035beffa914ed27b22 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 6 Mar 2025 22:08:49 +0800 Subject: [PATCH 07/21] test(api_fastapi/ui): fix test cases --- .../core_api/routes/ui/test_dependencies.py | 46 +++++++++++++------ 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/ui/test_dependencies.py b/tests/api_fastapi/core_api/routes/ui/test_dependencies.py index efa9ff99b11c4..c3df56cd36ac5 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_dependencies.py +++ b/tests/api_fastapi/core_api/routes/ui/test_dependencies.py @@ -68,7 +68,7 @@ def expected_primary_component_response(): return { "edges": [ { - "source_id": "asset:asset1", + "source_id": "asset:1", "target_id": "dag:downstream", }, { @@ -81,7 +81,7 @@ def expected_primary_component_response(): }, { "source_id": "dag:upstream", - "target_id": "asset:asset1", + "target_id": "asset:1", }, { "source_id": "sensor:other_dag:downstream:external_task_sensor", @@ -99,7 +99,7 @@ def expected_primary_component_response(): "type": "dag", }, { - "id": "asset:asset1", + "id": "asset:1", "label": "asset1", "type": "asset", }, @@ -149,12 +149,12 @@ def expected_secondary_component_response(): return { "edges": [ { - "source_id": "asset:asset2", + "source_id": "asset:2", "target_id": "dag:downstream_secondary", }, { "source_id": "dag:upstream_secondary", - "target_id": "asset:asset2", + "target_id": "asset:2", }, ], "nodes": [ @@ -164,7 +164,7 @@ def expected_secondary_component_response(): "type": "dag", }, { - "id": "asset:asset2", + "id": "asset:2", "label": "asset2", "type": "asset", }, @@ -214,14 +214,6 @@ def test_delete_dag_should_response_403(self, unauthorized_test_client): ("dag:upstream_secondary", "expected_secondary_component_response"), ], ) - @pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component") - def test_with_node_id_filter(self, test_client, node_id, expected_response_fixture, request): - expected_response = request.getfixturevalue(expected_response_fixture) - response = test_client.get("/ui/dependencies", params={"node_id": node_id}) - assert response.status_code == 200 - - assert response.json() == expected_response - @pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component") def test_with_node_id_filter_not_found(self, test_client): response = test_client.get("/ui/dependencies", params={"node_id": "missing_node_id"}) @@ -230,3 +222,29 @@ def test_with_node_id_filter_not_found(self, test_client): assert response.json() == { "detail": "Unique connected component not found, got [] for connected components of node missing_node_id, expected only 1 connected component.", } + + +@pytest.mark.parametrize( + "node_id, expected_response_fixture", + [ + # Primary Component + ("asset:1", "expected_primary_component_response"), + ("dag:downstream", "expected_primary_component_response"), + ("sensor:other_dag:downstream:external_task_sensor", "expected_primary_component_response"), + ("dag:external_trigger_dag_id", "expected_primary_component_response"), + ( + "trigger:external_trigger_dag_id:downstream:trigger_dag_run_operator", + "expected_primary_component_response", + ), + ("dag:upstream", "expected_primary_component_response"), + # Secondary Component + ("asset:2", "expected_secondary_component_response"), + ("dag:downstream_secondary", "expected_secondary_component_response"), + ("dag:upstream_secondary", "expected_secondary_component_response"), + ], +) +@pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component") +def test_get_dependencies_with_node_id_filter(test_client, node_id, expected_response_fixture, request): + expected_response = request.getfixturevalue(expected_response_fixture) + response = test_client.get("/ui/dependencies", params={"node_id": node_id}) + assert response.status_code == 200 From f1f4ff5a86847c4a44e6fe8ef53c10d086515cd7 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 6 Mar 2025 22:38:32 +0800 Subject: [PATCH 08/21] test(serialized_dag): fix test_order_of_deps_is_consistent --- .../core_api/routes/ui/test_dependencies.py | 34 +++++-------------- tests/models/test_serialized_dag.py | 13 +++++-- 2 files changed, 19 insertions(+), 28 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/ui/test_dependencies.py b/tests/api_fastapi/core_api/routes/ui/test_dependencies.py index c3df56cd36ac5..7f8f2c751a535 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_dependencies.py +++ b/tests/api_fastapi/core_api/routes/ui/test_dependencies.py @@ -214,6 +214,14 @@ def test_delete_dag_should_response_403(self, unauthorized_test_client): ("dag:upstream_secondary", "expected_secondary_component_response"), ], ) + @pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component") + def test_with_node_id_filter(self, test_client, node_id, expected_response_fixture, request): + expected_response = request.getfixturevalue(expected_response_fixture) + response = test_client.get("/ui/dependencies", params={"node_id": node_id}) + assert response.status_code == 200 + + assert response.json() == expected_response + @pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component") def test_with_node_id_filter_not_found(self, test_client): response = test_client.get("/ui/dependencies", params={"node_id": "missing_node_id"}) @@ -222,29 +230,3 @@ def test_with_node_id_filter_not_found(self, test_client): assert response.json() == { "detail": "Unique connected component not found, got [] for connected components of node missing_node_id, expected only 1 connected component.", } - - -@pytest.mark.parametrize( - "node_id, expected_response_fixture", - [ - # Primary Component - ("asset:1", "expected_primary_component_response"), - ("dag:downstream", "expected_primary_component_response"), - ("sensor:other_dag:downstream:external_task_sensor", "expected_primary_component_response"), - ("dag:external_trigger_dag_id", "expected_primary_component_response"), - ( - "trigger:external_trigger_dag_id:downstream:trigger_dag_run_operator", - "expected_primary_component_response", - ), - ("dag:upstream", "expected_primary_component_response"), - # Secondary Component - ("asset:2", "expected_secondary_component_response"), - ("dag:downstream_secondary", "expected_secondary_component_response"), - ("dag:upstream_secondary", "expected_secondary_component_response"), - ], -) -@pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component") -def test_get_dependencies_with_node_id_filter(test_client, node_id, expected_response_fixture, request): - expected_response = request.getfixturevalue(expected_response_fixture) - response = test_client.get("/ui/dependencies", params={"node_id": node_id}) - assert response.status_code == 200 diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index f3fbe1ac93494..84fee3cb0a362 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -27,6 +27,7 @@ import airflow.example_dags as example_dags_module from airflow.decorators import task as task_decorator +from airflow.models.asset import AssetModel from airflow.models.dag import DAG, DagModel from airflow.models.dag_version import DagVersion from airflow.models.dagbag import DagBag @@ -233,12 +234,19 @@ def test_order_of_dag_params_is_stable(self): assert before == after - def test_order_of_deps_is_consistent(self): + @pytest.mark.db_test + def test_order_of_deps_is_consistent(self, session): """ Previously the 'dag_dependencies' node in serialized dag was converted to list from set. This caused the order, and thus the hash value, to be unreliable, which could produce excessive dag parsing. """ + + db.clear_db_assets() + session.add_all([AssetModel(id=i, uri=f"test://asset{i}/", name=f"{i}") for i in range(1, 6)]) + session.add_all([AssetModel(id=i, uri=f"test://asset{i}/", name=f"{i}*") for i in (0, 6)]) + session.commit() + first_dag_hash = None for _ in range(10): with DAG( @@ -257,7 +265,7 @@ def test_order_of_deps_is_consistent(self): outlets=[Asset(uri="test://asset0", name="0*"), Asset(uri="test://asset6", name="6*")], bash_command="sleep 5", ) - deps_order = [x["dependency_id"] for x in SerializedDAG.serialize_dag(dag6)["dag_dependencies"]] + deps_order = [x["label"] for x in SerializedDAG.serialize_dag(dag6)["dag_dependencies"]] # in below assert, 0 and 6 both come at end because "source" is different for them and source # is the first field in DagDependency class assert deps_order == ["1", "2", "3", "4", "5", "0*", "6*"] @@ -272,6 +280,7 @@ def test_order_of_deps_is_consistent(self): # dag hash should not change without change in structure (we're in a loop) assert this_dag_hash == first_dag_hash + db.clear_db_assets() def test_example_dag_hashes_are_always_consistent(self, session): """ From 423540ecebad50f57b22763ff1ccbf40e01f05bc Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 6 Mar 2025 22:47:37 +0800 Subject: [PATCH 09/21] feat(dot_renderer): support label --- airflow/utils/dot_renderer.py | 4 ++-- tests/utils/test_dot_renderer.py | 16 +++++++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/airflow/utils/dot_renderer.py b/airflow/utils/dot_renderer.py index 24ee60a5f5709..e0d8c5a2fa765 100644 --- a/airflow/utils/dot_renderer.py +++ b/airflow/utils/dot_renderer.py @@ -179,8 +179,8 @@ def render_dag_dependencies(deps: dict[str, list[DagDependency]]) -> graphviz.Di "label": dag, }, ) as dep_subgraph: - dep_subgraph.edge(dep.source, dep.dependency_id) - dep_subgraph.edge(dep.dependency_id, dep.target) + dep_subgraph.edge(dep.source, dep.label) + dep_subgraph.edge(dep.label, dep.target) return dot diff --git a/tests/utils/test_dot_renderer.py b/tests/utils/test_dot_renderer.py index 9cffec7a8951c..26826f8d50140 100644 --- a/tests/utils/test_dot_renderer.py +++ b/tests/utils/test_dot_renderer.py @@ -51,15 +51,21 @@ def teardown_method(self): def test_should_render_dag_dependencies(self): dag_dep_1 = DagDependency( - source="dag_one", target="dag_two", dependency_type="Sensor", dependency_id="task_1" + source="dag_one", + target="dag_two", + label="task_1", + dependency_type="sensor", + dependency_id="task_1", ) dag_dep_2 = DagDependency( - source="dag_two", target="dag_three", dependency_type="Sensor", dependency_id="task_2" + source="dag_two", + target="dag_three", + label="task_2", + dependency_type="sensor", + dependency_id="task_2", ) - dag_dependency_list = [] - dag_dependency_list.append(dag_dep_1) - dag_dependency_list.append(dag_dep_2) + dag_dependency_list = [dag_dep_1, dag_dep_2] dag_dependency_dict = {} dag_dependency_dict["dag_one"] = dag_dependency_list From 5f8651e87238ac34fb013ccfa8c902af980b3432 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 7 Mar 2025 15:44:17 +0800 Subject: [PATCH 10/21] refactor: refactor retreive_asset_models as resolve_assets_as_dag_dependencies to retreive needed info only --- airflow/serialization/serialized_objects.py | 11 ++----- .../airflow/sdk/definitions/asset/__init__.py | 30 ++++++++++--------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 07d2061a19b97..8ad677a258a26 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -39,7 +39,7 @@ from airflow import macros from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.exceptions import AirflowException, SerializationError, TaskDeferred -from airflow.models.asset import retreive_asset_models +from airflow.models.asset import resolve_assets_as_dag_dependencies from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection from airflow.models.dag import DAG, _get_model_data_interval @@ -1125,14 +1125,9 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: if assets: with create_session() as session: deps.extend( - DagDependency( - source=task.dag_id, - target="asset", - label=asset_model.name, - dependency_type="asset", - dependency_id=str(asset_model.id), + resolve_assets_as_dag_dependencies( + source=task.dag_id, target="asset", assets=assets, session=session ) - for asset_model in retreive_asset_models(assets=assets, session=session) ) return deps diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index a723133d9a983..0b35d56466681 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -442,23 +442,25 @@ def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDepe :meta private: """ - from airflow.models.asset import retreive_asset_models + from airflow.models.asset import resolve_assets_as_dag_dependencies from airflow.utils.session import create_session - asset_id = None with create_session() as session: - asset_models = retreive_asset_models(assets=[self], session=session) - # handle the case that asset has not yet been added - if asset_models: - asset_id = str(asset_models[0].id) - - yield DagDependency( - source=source or "asset", - target=target or "asset", - label=self.name, - dependency_type="asset", - dependency_id=asset_id, - ) + dag_dependencies = resolve_assets_as_dag_dependencies( + source=source or "asset", target=target or "asset", assets=[self], session=session + ) + + # handle the case that asset has not yet been added + if dag_dependencies: + yield dag_dependencies[0] + else: + yield DagDependency( + source=source or "asset", + target=target or "asset", + label=self.name, + dependency_type="asset", + dependency_id=None, + ) def asprofile(self) -> AssetProfile: """ From ed334e4e378bdaff6e2bd6a4ea36b0a6db80a5a6 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 7 Mar 2025 23:09:29 +0800 Subject: [PATCH 11/21] feat: remove db access in serialized object --- airflow/serialization/serialized_objects.py | 21 ++++++------- .../airflow/sdk/definitions/asset/__init__.py | 30 +++++++------------ 2 files changed, 20 insertions(+), 31 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 8ad677a258a26..9f423e117df7d 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -39,7 +39,6 @@ from airflow import macros from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.exceptions import AirflowException, SerializationError, TaskDeferred -from airflow.models.asset import resolve_assets_as_dag_dependencies from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection from airflow.models.dag import DAG, _get_model_data_interval @@ -92,7 +91,6 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string, qualname from airflow.utils.operator_resources import Resources -from airflow.utils.session import create_session from airflow.utils.timezone import from_timestamp, parse_timezone from airflow.utils.types import NOTSET, ArgNotSet @@ -1115,20 +1113,19 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: ) ) - assets = [] for obj in task.outlets or []: if isinstance(obj, Asset): - assets.append(obj) - elif isinstance(obj, AssetAlias): - deps.extend(obj.iter_dag_dependencies(source=task.dag_id, target="")) - - if assets: - with create_session() as session: - deps.extend( - resolve_assets_as_dag_dependencies( - source=task.dag_id, target="asset", assets=assets, session=session + deps.append( + DagDependency( + source=task.dag_id, + target="asset", + label=obj.name, + dependency_type="asset", + dependency_id=None, ) ) + elif isinstance(obj, AssetAlias): + deps.extend(obj.iter_dag_dependencies(source=task.dag_id, target="")) return deps diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index 0b35d56466681..9ddfb657d8c93 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -442,25 +442,15 @@ def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDepe :meta private: """ - from airflow.models.asset import resolve_assets_as_dag_dependencies - from airflow.utils.session import create_session - - with create_session() as session: - dag_dependencies = resolve_assets_as_dag_dependencies( - source=source or "asset", target=target or "asset", assets=[self], session=session - ) - - # handle the case that asset has not yet been added - if dag_dependencies: - yield dag_dependencies[0] - else: - yield DagDependency( - source=source or "asset", - target=target or "asset", - label=self.name, - dependency_type="asset", - dependency_id=None, - ) + yield DagDependency( + source=source or "asset", + target=target or "asset", + label=self.name, + dependency_type="asset", + # We can't get asset id at this stage. + # This will be updated when running SerializedDagModel.get_dag_dependencies + dependency_id=None, + ) def asprofile(self) -> AssetProfile: """ @@ -498,6 +488,7 @@ def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterat yield DagDependency( source=source or "asset-ref", target=target or "asset-ref", + label=dependency_id, dependency_type="asset-ref", dependency_id=dependency_id, ) @@ -562,6 +553,7 @@ def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterat yield DagDependency( source=source or "asset-alias", target=target or "asset-alias", + label=self.name, dependency_type="asset-alias", dependency_id=self.name, ) From ddba315732f0e16fa0bba4c472d232ee0fb07498 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 10 Mar 2025 20:41:45 +0800 Subject: [PATCH 12/21] feat(dag_dependency): improve dependency_type typing and split asset-ref to asset-name-ref and asset-uri-ref --- airflow/serialization/dag_dependency.py | 3 ++- .../src/airflow/sdk/definitions/asset/__init__.py | 14 ++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/airflow/serialization/dag_dependency.py b/airflow/serialization/dag_dependency.py index b4ad79ee70acf..6633515275472 100644 --- a/airflow/serialization/dag_dependency.py +++ b/airflow/serialization/dag_dependency.py @@ -17,6 +17,7 @@ from __future__ import annotations from dataclasses import dataclass +from typing import Literal @dataclass(frozen=True, order=True) @@ -30,7 +31,7 @@ class DagDependency: source: str target: str label: str - dependency_type: str + dependency_type: Literal["asset", "asset-name-ref", "asset-uri-ref", "tirgger", "sensor"] dependency_id: str | None = None @property diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index 9ddfb657d8c93..5383c88ea58cd 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -22,7 +22,7 @@ import os import urllib.parse import warnings -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Union, overload +from typing import TYPE_CHECKING, Any, Callable, ClassVar, Literal, Union, overload import attrs @@ -471,6 +471,8 @@ class AssetRef(BaseAsset, AttrsInstance): :meta private: """ + _dependency_type: Literal["asset-name-ref", "asset-uri-ref"] + def as_expression(self) -> Any: return {"asset_ref": attrs.asdict(self)} @@ -486,10 +488,10 @@ def iter_asset_refs(self) -> Iterator[AssetRef]: def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterator[DagDependency]: (dependency_id,) = attrs.astuple(self) yield DagDependency( - source=source or "asset-ref", - target=target or "asset-ref", + source=source or self._dependency_type, + target=target or self._dependency_type, label=dependency_id, - dependency_type="asset-ref", + dependency_type=self._dependency_type, dependency_id=dependency_id, ) @@ -500,6 +502,8 @@ class AssetNameRef(AssetRef): name: str + _dependency_type = "asset-name-ref" + @attrs.define(hash=True) class AssetUriRef(AssetRef): @@ -507,6 +511,8 @@ class AssetUriRef(AssetRef): uri: str + _dependency_type = "asset-uri-ref" + class Dataset(Asset): """A representation of dataset dependencies between workflows.""" From 42517afec44f05dd92905163ee08b5fc141af5d9 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 11 Mar 2025 10:24:26 +0800 Subject: [PATCH 13/21] feat(serialized_dag): resolve dag dependency to include asset id --- airflow/models/serialized_dag.py | 15 ++++++++++++--- airflow/serialization/dag_dependency.py | 2 +- airflow/serialization/serialized_objects.py | 2 +- airflow/utils/dot_renderer.py | 8 ++++++-- .../src/airflow/sdk/definitions/asset/__init__.py | 10 +++++++++- 5 files changed, 29 insertions(+), 8 deletions(-) diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 5fae9441052ee..0eb6e960d66c6 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -27,17 +27,22 @@ import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select +from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, tuple_ from sqlalchemy.orm import backref, foreign, relationship from sqlalchemy.sql.expression import func, literal from sqlalchemy_utils import UUIDType from airflow.exceptions import TaskNotFound +from airflow.models.asset import ( + AssetAliasModel, + AssetModel, +) from airflow.models.base import ID_LEN, Base from airflow.models.dag import DagModel from airflow.models.dag_version import DagVersion from airflow.models.dagcode import DagCode from airflow.models.dagrun import DagRun +from airflow.sdk.definitions.asset import AssetUniqueKey from airflow.serialization.dag_dependency import DagDependency from airflow.serialization.serialized_objects import SerializedDAG from airflow.settings import COMPRESS_SERIALIZED_DAGS, MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json @@ -477,7 +482,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ .join(cls.dag_model) .where(DagModel.is_active) ) - iterator = ((dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query) + iterator = [(dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query] else: iterator = session.execute( select(cls.dag_id, func.json_extract_path(cls._data, "dag", "dag_dependencies")) @@ -489,7 +494,11 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ .join(cls.dag_model) .where(DagModel.is_active) ) - return {dag_id: [DagDependency(**d) for d in (deps_data or [])] for dag_id, deps_data in iterator} + + resolver = _DagDependenciesResolver(dag_id_dependencies=iterator, session=session) + dag_depdendencies_by_dag = resolver.resolve() + + return dag_depdendencies_by_dag @staticmethod @provide_session diff --git a/airflow/serialization/dag_dependency.py b/airflow/serialization/dag_dependency.py index 6633515275472..745a487eed42a 100644 --- a/airflow/serialization/dag_dependency.py +++ b/airflow/serialization/dag_dependency.py @@ -31,7 +31,7 @@ class DagDependency: source: str target: str label: str - dependency_type: Literal["asset", "asset-name-ref", "asset-uri-ref", "tirgger", "sensor"] + dependency_type: Literal["asset", "asset-name-ref", "asset-uri-ref", "asset-alias", "trigger", "sensor"] dependency_id: str | None = None @property diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 9f423e117df7d..b01117b9ec61a 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1121,7 +1121,7 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: target="asset", label=obj.name, dependency_type="asset", - dependency_id=None, + dependency_id=AssetUniqueKey.from_asset(obj).to_str(), ) ) elif isinstance(obj, AssetAlias): diff --git a/airflow/utils/dot_renderer.py b/airflow/utils/dot_renderer.py index e0d8c5a2fa765..fc1685b68b61f 100644 --- a/airflow/utils/dot_renderer.py +++ b/airflow/utils/dot_renderer.py @@ -179,8 +179,12 @@ def render_dag_dependencies(deps: dict[str, list[DagDependency]]) -> graphviz.Di "label": dag, }, ) as dep_subgraph: - dep_subgraph.edge(dep.source, dep.label) - dep_subgraph.edge(dep.label, dep.target) + leaf_nodes = ("asset", "asset-name-ref", "asset-uri-ref", "asset-alias") + if dep.source not in leaf_nodes: + dep_subgraph.edge(dep.source, dep.dependency_id) + + if dep.target not in leaf_nodes: + dep_subgraph.edge(dep.dependency_id, dep.target) return dot diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index 5383c88ea58cd..43f0d7c2c1359 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -17,6 +17,7 @@ from __future__ import annotations +import json import logging import operator import os @@ -77,6 +78,13 @@ def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: def to_asset(self) -> Asset: return Asset(name=self.name, uri=self.uri) + @staticmethod + def from_str(key: str) -> AssetUniqueKey: + return AssetUniqueKey(**json.loads(key)) + + def to_str(self) -> str: + return json.dumps(attrs.asdict(self)) + @attrs.define(frozen=True) class AssetAliasUniqueKey: @@ -449,7 +457,7 @@ def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDepe dependency_type="asset", # We can't get asset id at this stage. # This will be updated when running SerializedDagModel.get_dag_dependencies - dependency_id=None, + dependency_id=AssetUniqueKey.from_asset(self).to_str(), ) def asprofile(self) -> AssetProfile: From a00ea544bafcaf5bc69d9058b8af2aa366b9d3b9 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 19 Mar 2025 17:11:23 +0800 Subject: [PATCH 14/21] refactor(serialized_dag): extract dag dependency resolving logic as a class --- airflow/models/serialized_dag.py | 183 ++++++++++++++++++++++++++++++- 1 file changed, 182 insertions(+), 1 deletion(-) diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 0eb6e960d66c6..7fbbbc5bd338f 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -21,7 +21,7 @@ import logging import zlib -from collections.abc import Iterable +from collections.abc import Iterable, Iterator, Sequence from datetime import timedelta from typing import TYPE_CHECKING, Any @@ -63,6 +63,187 @@ log = logging.getLogger(__name__) +class _DagDependenciesResolver: + """Resolver that resolves dag dependencies to include asset id and assets link to asset aliases.""" + + def __init__(self, dag_id_dependencies: Sequence[tuple[str, dict]], session: Session) -> None: + self.dag_id_dependencies = dag_id_dependencies + self.session = session + + self.asset_key_to_id: dict[AssetUniqueKey, int] = {} + self.asset_ref_name_to_asset_id_name: dict[str, tuple[int, str]] = {} + self.asset_ref_uri_to_asset_id_name: dict[str, tuple[int, str]] = {} + self.alias_names_to_asset_ids_names: dict[str, list[tuple[int, str]]] = {} + + def resolve(self) -> dict[str, list[DagDependency]]: + asset_names_uris, asset_ref_names, asset_ref_uris, asset_alias_names = self.collect_asset_info() + + self.asset_key_to_id = self.collect_asset_key_to_ids(asset_names_uris) + self.asset_ref_name_to_asset_id_name = self.collect_asset_name_ref_to_ids_names(asset_ref_names) + self.asset_ref_uri_to_asset_id_name = self.collect_asset_uri_ref_to_ids_names(asset_ref_uris) + self.alias_names_to_asset_ids_names = self.collect_alias_to_assets(asset_alias_names) + + dag_depdendencies_by_dag: dict[str, list[DagDependency]] = {} + for dag_id, deps_data in self.dag_id_dependencies: + dag_deps: list[DagDependency] = [] + for dep_data in deps_data or {}: + dep_type = dep_data["dependency_type"] + if dep_type == "asset": + dag_deps.append(self.resolve_asset_dag_dep(dep_data)) + elif dep_type == "asset-name-ref": + dag_deps.extend(self.resolve_asset_name_ref_dag_dep(dep_data)) + elif dep_type == "asset-uri-ref": + dag_deps.extend(self.resolve_asset_uri_ref_dag_dep(dep_data)) + elif dep_type == "asset-alias": + dag_deps.extend(self.resolve_asset_alias_dag_dep(dep_data)) + else: + # Replace asset_key with asset id if it's in source or target + for node_key in ("source", "target"): + if dep_data[node_key].startswith("asset:"): + unique_key = AssetUniqueKey.from_str(dep_data[node_key].split(":")[1]) + asset_id = self.asset_key_to_id[unique_key] + dep_data[node_key] = f"asset:{asset_id}" + break + + dag_deps.append(DagDependency(**dep_data)) + + dag_depdendencies_by_dag[dag_id] = dag_deps + return dag_depdendencies_by_dag + + def collect_asset_info(self) -> tuple[set, set, set, set]: + asset_names_uris: set[tuple[str, str]] = set() + asset_ref_names: set[str] = set() + asset_ref_uris: set[str] = set() + asset_alias_names: set[str] = set() + for _, deps_data in self.dag_id_dependencies: + for dep_data in deps_data or {}: + dep_type = dep_data["dependency_type"] + dep_id = dep_data["dependency_id"] + if dep_type == "asset": + unique_key = AssetUniqueKey.from_str(dep_id) + asset_names_uris.add((unique_key.name, unique_key.uri)) + elif dep_type == "asset-name-ref": + asset_ref_names.add(dep_id) + elif dep_type == "asset-uri-ref": + asset_ref_uris.add(dep_id) + elif dep_type == "asset-alias": + asset_alias_names.add(dep_id) + return asset_names_uris, asset_ref_names, asset_ref_uris, asset_alias_names + + def collect_asset_key_to_ids(self, asset_name_uris: set[tuple[str, str]]) -> dict[AssetUniqueKey, int]: + return { + AssetUniqueKey(name=name, uri=uri): asset_id + for name, uri, asset_id in self.session.execute( + select(AssetModel.name, AssetModel.uri, AssetModel.id).where( + tuple_(AssetModel.name, AssetModel.uri).in_(asset_name_uris) + ) + ) + } + + def collect_asset_name_ref_to_ids_names(self, asset_ref_names) -> dict[str, tuple[int, str]]: + return { + name: (asset_id, name) + for name, asset_id in self.session.execute( + select(AssetModel.name, AssetModel.id).where( + AssetModel.name.in_(asset_ref_names), AssetModel.active.has() + ) + ) + } + + def collect_asset_uri_ref_to_ids_names(self, asset_ref_uris) -> dict[str, tuple[int, str]]: + return { + uri: (asset_id, name) + for uri, name, asset_id in self.session.execute( + select(AssetModel.uri, AssetModel.name, AssetModel.id).where( + AssetModel.uri.in_(asset_ref_uris), AssetModel.active.has() + ) + ) + } + + def collect_alias_to_assets(self, asset_alias_names) -> dict[str, list[tuple[int, str]]]: + return { + aam.name: [(am.id, am.name) for am in aam.assets] + for aam in self.session.scalars( + select(AssetAliasModel).where(AssetAliasModel.name.in_(asset_alias_names)) + ) + } + + def resolve_asset_dag_dep(self, dep_data: dict) -> DagDependency: + dep_id = dep_data["dependency_id"] + unique_key = AssetUniqueKey.from_str(dep_id) + dep_data["dependency_id"] = str(self.asset_key_to_id[unique_key]) + return DagDependency(**dep_data) + + def resolve_asset_name_ref_dag_dep(self, dep_data) -> Sequence[DagDependency]: + dep_id = dep_data["dependency_id"] + is_source_ref = dep_data["source"] == "asest-name-ref" + asset_id, asset_name = self.asset_ref_name_to_asset_id_name[dep_id] + return [ + # asset + DagDependency( + source="asset" if is_source_ref else f"asset-name-ref:{dep_id}", + target=f"asset-name-ref:{dep_id}" if is_source_ref else "asset", + label=asset_name, + dependency_type="asset", + dependency_id=str(asset_id), + ), + # asset ref + DagDependency( + source=f"asset:{asset_id}" if is_source_ref else dep_data["source"], + target=dep_data["target"] if is_source_ref else f"asset:{asset_id}", + label=dep_id, + dependency_type="asset-name-ref", + dependency_id=dep_id, + ), + ] + + def resolve_asset_uri_ref_dag_dep(self, dep_data: dict) -> Sequence[DagDependency]: + dep_id = dep_data["dependency_id"] + is_source_ref = dep_data["source"] == "asest-uri-ref" + asset_id, asset_name = self.asset_ref_uri_to_asset_id_name[dep_id] + return [ + # asset + DagDependency( + source="asset" if is_source_ref else f"asset-uri-ref:{dep_id}", + target=f"asset-uri-ref:{dep_id}" if is_source_ref else "asset", + label=asset_name, + dependency_type="asset", + dependency_id=str(asset_id), + ), + # asset ref + DagDependency( + source=f"asset:{asset_id}" if is_source_ref else dep_data["source"], + target=dep_data["target"] if is_source_ref else f"asset:{asset_id}", + label=dep_id, + dependency_type="asset-uri-ref", + dependency_id=dep_id, + ), + ] + + def resolve_asset_alias_dag_dep(self, dep_data: dict) -> Iterator[DagDependency]: + dep_id = dep_data["dependency_id"] + for asset_id, asset_name in self.alias_names_to_asset_ids_names[dep_id]: + is_source_alias = dep_data["source"] == "asset-alias" + yield from [ + # asset + DagDependency( + source="asset" if is_source_alias else f"asset-alias:{dep_id}", + target=f"asset-alias:{dep_id}" if is_source_alias else "asset", + label=asset_name, + dependency_type="asset", + dependency_id=str(asset_id), + ), + # asset alias + DagDependency( + source=f"asset:{asset_id}" if is_source_alias else dep_data["source"], + target=dep_data["target"] if is_source_alias else f"asset:{asset_id}", + label=dep_id, + dependency_type="asset-alias", + dependency_id=dep_id, + ), + ] + + class SerializedDagModel(Base): """ A table for serialized DAGs. From 9652506126a4630624a8d4ded89f8f718327668b Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 19 Mar 2025 17:28:38 +0800 Subject: [PATCH 15/21] test(dag_serialization): fix dag_serialization tests --- tests/serialization/test_dag_serialization.py | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 973be6bdb5853..ede6fe277cfd0 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -67,7 +67,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.sensors.bash import BashSensor -from airflow.sdk.definitions.asset import Asset +from airflow.sdk.definitions.asset import Asset, AssetUniqueKey from airflow.sdk.definitions.param import Param, ParamsDict from airflow.security import permissions from airflow.serialization.enums import Encoding @@ -1713,6 +1713,8 @@ def other_asset_writer(x): other_asset_writer.expand(x=[1, 2]) + testing_asset_key_strs = [AssetUniqueKey.from_asset(asset).to_str() for asset in testing_assets] + dag = SerializedDAG.to_dict(dag) actual = sorted(dag["dag"]["dag_dependencies"], key=lambda x: tuple(x.values())) expected = sorted( @@ -1722,7 +1724,7 @@ def other_asset_writer(x): "target": "asset", "label": "asset4", "dependency_type": "asset", - "dependency_id": "4", + "dependency_id": testing_asset_key_strs[3], }, { "source": "external_dag_id", @@ -1736,49 +1738,49 @@ def other_asset_writer(x): "target": "asset", "label": "asset3", "dependency_type": "asset", - "dependency_id": "3", + "dependency_id": testing_asset_key_strs[2], }, { "source": "test", "target": "asset", "label": "asset2", "dependency_type": "asset", - "dependency_id": "2", + "dependency_id": testing_asset_key_strs[1], }, { "source": "asset", "target": "test", "label": "asset1", "dependency_type": "asset", - "dependency_id": "1", + "dependency_id": testing_asset_key_strs[0], }, { "source": "asset", "target": "test", "label": "asset1", "dependency_type": "asset", - "dependency_id": "1", + "dependency_id": testing_asset_key_strs[0], }, { "source": "asset", "target": "test", "label": "asset1", "dependency_type": "asset", - "dependency_id": "1", + "dependency_id": testing_asset_key_strs[0], }, { "source": "asset", "target": "test", "label": "asset1", "dependency_type": "asset", - "dependency_id": "1", + "dependency_id": testing_asset_key_strs[0], }, { "source": "asset", "target": "test", "label": "asset1", "dependency_type": "asset", - "dependency_id": "1", + "dependency_id": testing_asset_key_strs[0], }, ], key=lambda x: tuple(x.values()), @@ -1789,6 +1791,9 @@ def other_asset_writer(x): def test_dag_deps_assets(self, testing_assets): """ Check that dag_dependencies node is populated correctly for a DAG with assets. + + Note that asset id will not be stored at this stage and will be later evaluated when + calling SerializedDagModel.get_dag_dependencies. """ from airflow.providers.standard.sensors.external_task import ExternalTaskSensor @@ -1807,6 +1812,8 @@ def other_asset_writer(x): other_asset_writer.expand(x=[1, 2]) + testing_asset_key_strs = [AssetUniqueKey.from_asset(asset).to_str() for asset in testing_assets] + dag = SerializedDAG.to_dict(dag) actual = sorted(dag["dag"]["dag_dependencies"], key=lambda x: tuple(x.values())) expected = sorted( @@ -1816,7 +1823,7 @@ def other_asset_writer(x): "target": "asset", "label": "asset4", "dependency_type": "asset", - "dependency_id": "4", + "dependency_id": testing_asset_key_strs[3], }, { "source": "external_dag_id", @@ -1830,21 +1837,21 @@ def other_asset_writer(x): "target": "asset", "label": "asset3", "dependency_type": "asset", - "dependency_id": "3", + "dependency_id": testing_asset_key_strs[2], }, { "source": "test", "target": "asset", "label": "asset2", "dependency_type": "asset", - "dependency_id": "2", + "dependency_id": testing_asset_key_strs[1], }, { "source": "asset", "target": "test", "label": "asset1", "dependency_type": "asset", - "dependency_id": "1", + "dependency_id": testing_asset_key_strs[0], }, ], key=lambda x: tuple(x.values()), From ec26c2352cb2695fb5df39814bfc31438ca6e35d Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 19 Mar 2025 17:59:29 +0800 Subject: [PATCH 16/21] test(api_fastapi): update test cases with asset_id changes --- .../core_api/routes/ui/test_dependencies.py | 74 ++++++++++++++----- 1 file changed, 56 insertions(+), 18 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/ui/test_dependencies.py b/tests/api_fastapi/core_api/routes/ui/test_dependencies.py index 7f8f2c751a535..4855961195b10 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_dependencies.py +++ b/tests/api_fastapi/core_api/routes/ui/test_dependencies.py @@ -18,7 +18,9 @@ import pendulum import pytest +from sqlalchemy import select +from airflow.models.asset import AssetModel from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.sensors.external_task import ExternalTaskSensor @@ -36,7 +38,12 @@ def cleanup(): @pytest.fixture -def make_primary_connected_component(dag_maker): +def asset1() -> Asset: + return Asset(uri="s3://bucket/next-run-asset/1", name="asset1") + + +@pytest.fixture +def make_primary_connected_component(dag_maker, asset1): with dag_maker( dag_id="external_trigger_dag_id", serialized=True, @@ -47,13 +54,12 @@ def make_primary_connected_component(dag_maker): with dag_maker(dag_id="other_dag", serialized=True): EmptyOperator(task_id="task1") - asset = Asset(uri="s3://bucket/next-run-asset/1", name="asset1") with dag_maker(dag_id="upstream", serialized=True): - EmptyOperator(task_id="task2", outlets=[asset]) + EmptyOperator(task_id="task2", outlets=[asset1]) with dag_maker( dag_id="downstream", - schedule=[asset], + schedule=[asset1], serialized=True, ): EmptyOperator(task_id="task1") >> ExternalTaskSensor( @@ -64,11 +70,18 @@ def make_primary_connected_component(dag_maker): @pytest.fixture -def expected_primary_component_response(): +def asset1_id(make_primary_connected_component, asset1, session) -> int: + return session.scalar( + select(AssetModel.id).where(AssetModel.name == asset1.name, AssetModel.uri == asset1.uri) + ) + + +@pytest.fixture +def expected_primary_component_response(asset1_id): return { "edges": [ { - "source_id": "asset:1", + "source_id": f"asset:{asset1_id}", "target_id": "dag:downstream", }, { @@ -81,7 +94,7 @@ def expected_primary_component_response(): }, { "source_id": "dag:upstream", - "target_id": "asset:1", + "target_id": f"asset:{asset1_id}", }, { "source_id": "sensor:other_dag:downstream:external_task_sensor", @@ -99,7 +112,7 @@ def expected_primary_component_response(): "type": "dag", }, { - "id": "asset:1", + "id": f"asset:{asset1_id}", "label": "asset1", "type": "asset", }, @@ -128,15 +141,18 @@ def expected_primary_component_response(): @pytest.fixture -def make_secondary_connected_component(dag_maker): - asset = Asset(uri="s3://bucket/next-run-asset/2", name="asset2") +def asset2() -> Asset: + return Asset(uri="s3://bucket/next-run-asset/2", name="asset2") + +@pytest.fixture +def make_secondary_connected_component(dag_maker, asset2): with dag_maker(dag_id="upstream_secondary", serialized=True): - EmptyOperator(task_id="task2", outlets=[asset]) + EmptyOperator(task_id="task2", outlets=[asset2]) with dag_maker( dag_id="downstream_secondary", - schedule=[asset], + schedule=[asset2], serialized=True, ): EmptyOperator(task_id="task1") @@ -145,16 +161,23 @@ def make_secondary_connected_component(dag_maker): @pytest.fixture -def expected_secondary_component_response(): +def asset2_id(make_secondary_connected_component, asset2, session) -> int: + return session.scalar( + select(AssetModel.id).where(AssetModel.name == asset2.name, AssetModel.uri == asset2.uri) + ) + + +@pytest.fixture +def expected_secondary_component_response(asset2_id): return { "edges": [ { - "source_id": "asset:2", + "source_id": f"asset:{asset2_id}", "target_id": "dag:downstream_secondary", }, { "source_id": "dag:upstream_secondary", - "target_id": "asset:2", + "target_id": f"asset:{asset2_id}", }, ], "nodes": [ @@ -164,7 +187,7 @@ def expected_secondary_component_response(): "type": "dag", }, { - "id": "asset:2", + "id": f"asset:{asset2_id}", "label": "asset2", "type": "asset", }, @@ -199,7 +222,6 @@ def test_delete_dag_should_response_403(self, unauthorized_test_client): "node_id, expected_response_fixture", [ # Primary Component - ("asset:asset1", "expected_primary_component_response"), ("dag:downstream", "expected_primary_component_response"), ("sensor:other_dag:downstream:external_task_sensor", "expected_primary_component_response"), ("dag:external_trigger_dag_id", "expected_primary_component_response"), @@ -209,7 +231,6 @@ def test_delete_dag_should_response_403(self, unauthorized_test_client): ), ("dag:upstream", "expected_primary_component_response"), # Secondary Component - ("asset:asset2", "expected_secondary_component_response"), ("dag:downstream_secondary", "expected_secondary_component_response"), ("dag:upstream_secondary", "expected_secondary_component_response"), ], @@ -222,6 +243,23 @@ def test_with_node_id_filter(self, test_client, node_id, expected_response_fixtu assert response.json() == expected_response + def test_with_node_id_filter_with_asset( + self, + test_client, + asset1_id, + asset2_id, + expected_primary_component_response, + expected_secondary_component_response, + ): + for asset_id, expected_response in ( + (asset1_id, expected_primary_component_response), + (asset2_id, expected_secondary_component_response), + ): + response = test_client.get("/ui/dependencies", params={"node_id": f"asset:{asset_id}"}) + assert response.status_code == 200 + + assert response.json() == expected_response + @pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component") def test_with_node_id_filter_not_found(self, test_client): response = test_client.get("/ui/dependencies", params={"node_id": "missing_node_id"}) From d4503040917bcc208382a6f49dd1538d7e3cdbe5 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 19 Mar 2025 22:51:55 +0800 Subject: [PATCH 17/21] test(api_fastapi): fix test case with dynamic asset id --- .../core_api/routes/ui/test_structure.py | 208 +++++++++++++++++- 1 file changed, 206 insertions(+), 2 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index afe504d74bff2..ce2c9dce4dbed 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -21,8 +21,10 @@ import pendulum import pytest +from sqlalchemy import select from airflow.models import DagBag +from airflow.models.asset import AssetModel from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.sensors.external_task import ExternalTaskSensor @@ -97,7 +99,12 @@ def clean(): @pytest.fixture -def make_dag(dag_maker, session, time_machine): +def asset3(): + return Dataset(uri="s3://dataset-bucket/example.csv") + + +@pytest.fixture +def make_dag(dag_maker, session, time_machine, asset3): with dag_maker( dag_id=DAG_ID_EXTERNAL_TRIGGER, serialized=True, @@ -120,7 +127,7 @@ def make_dag(dag_maker, session, time_machine): ), ): ( - EmptyOperator(task_id="task_1", outlets=[Dataset(uri="s3://dataset-bucket/example.csv")]) + EmptyOperator(task_id="task_1", outlets=[asset3]) >> ExternalTaskSensor(task_id="external_task_sensor", external_dag_id=DAG_ID) >> EmptyOperator(task_id="task_2") ) @@ -128,6 +135,13 @@ def make_dag(dag_maker, session, time_machine): dag_maker.sync_dagbag_to_db() +@pytest.fixture +def asset3_id(make_dag, asset3, session) -> int: + return session.scalar( + select(AssetModel.id).where(AssetModel.name == asset3.name, AssetModel.uri == asset3.uri) + ) + + class TestStructureDataEndpoint: @pytest.mark.parametrize( "params, expected", @@ -265,6 +279,196 @@ def test_should_return_200(self, test_client, params, expected): assert response.status_code == 200 assert response.json() == expected + @pytest.mark.usefixtures("make_dag") + def test_should_return_200_with_asset(self, test_client, asset3_id): + params = { + "dag_id": DAG_ID, + "external_dependencies": True, + } + expected = { + "edges": [ + { + "is_setup_teardown": None, + "label": None, + "source_id": "and-gate-0", + "target_id": "task_1", + "is_source_asset": True, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "asset1", + "target_id": "and-gate-0", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "asset2", + "target_id": "and-gate-0", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "example-alias", + "target_id": "and-gate-0", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "sensor:dag_with_multiple_versions:dag_with_multiple_versions:external_task_sensor", + "target_id": "task_1", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "trigger:external_trigger:dag_with_multiple_versions:trigger_dag_run_operator", + "target_id": "task_1", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "external_task_sensor", + "target_id": "task_2", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_1", + "target_id": "external_task_sensor", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_2", + "target_id": f"asset:{asset3_id}", + "is_source_asset": None, + }, + ], + "nodes": [ + { + "children": None, + "id": "task_1", + "is_mapped": None, + "label": "task_1", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "EmptyOperator", + "asset_condition_type": None, + }, + { + "children": None, + "id": "external_task_sensor", + "is_mapped": None, + "label": "external_task_sensor", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "ExternalTaskSensor", + "asset_condition_type": None, + }, + { + "children": None, + "id": "task_2", + "is_mapped": None, + "label": "task_2", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "EmptyOperator", + "asset_condition_type": None, + }, + { + "children": None, + "id": f"asset:{asset3_id}", + "is_mapped": None, + "label": "s3://dataset-bucket/example.csv", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset", + "operator": None, + "asset_condition_type": None, + }, + { + "children": None, + "id": "sensor:dag_with_multiple_versions:dag_with_multiple_versions:external_task_sensor", + "is_mapped": None, + "label": "external_task_sensor", + "tooltip": None, + "setup_teardown_type": None, + "type": "sensor", + "operator": None, + "asset_condition_type": None, + }, + { + "children": None, + "id": "trigger:external_trigger:dag_with_multiple_versions:trigger_dag_run_operator", + "is_mapped": None, + "label": "trigger_dag_run_operator", + "tooltip": None, + "setup_teardown_type": None, + "type": "trigger", + "operator": None, + "asset_condition_type": None, + }, + { + "children": None, + "id": "and-gate-0", + "is_mapped": None, + "label": "and-gate-0", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset-condition", + "operator": None, + "asset_condition_type": "and-gate", + }, + { + "children": None, + "id": "asset1", + "is_mapped": None, + "label": "asset1", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset", + "operator": None, + "asset_condition_type": None, + }, + { + "children": None, + "id": "asset2", + "is_mapped": None, + "label": "asset2", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset", + "operator": None, + "asset_condition_type": None, + }, + { + "children": None, + "id": "example-alias", + "is_mapped": None, + "label": "example-alias", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset-alias", + "operator": None, + "asset_condition_type": None, + }, + ], + } + + response = test_client.get("/ui/structure/structure_data", params=params) + assert response.status_code == 200 + assert response.json() == expected + @pytest.mark.parametrize( "params, expected", [ From b17ad64086673c899e4550c92b645a21d5bbccd4 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 20 Mar 2025 12:39:25 +0800 Subject: [PATCH 18/21] fix(serialized_dag): fix postgrest dag dep handling --- airflow/models/serialized_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 7fbbbc5bd338f..b1defa8f9ddff 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -674,7 +674,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ ) .join(cls.dag_model) .where(DagModel.is_active) - ) + ).all() resolver = _DagDependenciesResolver(dag_id_dependencies=iterator, session=session) dag_depdendencies_by_dag = resolver.resolve() From 453d756969211ff03db15443faa8e9d0157c2d94 Mon Sep 17 00:00:00 2001 From: Brent Bovenzi Date: Thu, 20 Mar 2025 10:32:54 -0400 Subject: [PATCH 19/21] Fix asset graph in UI --- airflow/ui/src/pages/Asset/AssetGraph.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/ui/src/pages/Asset/AssetGraph.tsx b/airflow/ui/src/pages/Asset/AssetGraph.tsx index 1fa904017eca0..72d52fbc10a42 100644 --- a/airflow/ui/src/pages/Asset/AssetGraph.tsx +++ b/airflow/ui/src/pages/Asset/AssetGraph.tsx @@ -41,7 +41,7 @@ export const AssetGraph = ({ asset }: { readonly asset?: AssetResponse }) => { const { colorMode = "light" } = useColorMode(); const { data = { edges: [], nodes: [] } } = useDependenciesServiceGetDependencies( - { nodeId: `asset:${asset?.name}` }, + { nodeId: `asset:${asset?.id}` }, undefined, { enabled: Boolean(asset) && Boolean(asset?.name) }, ); From 5c98ac53d8d80a0cfe474d3279497640fe6a428a Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Mar 2025 14:52:55 +0800 Subject: [PATCH 20/21] test(api_fastapi): fix structure endpoint test case --- .../core_api/routes/ui/test_structure.py | 49 +++++++++++++------ 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index ce2c9dce4dbed..51370ea13db2b 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -22,6 +22,7 @@ import pendulum import pytest from sqlalchemy import select +from sqlalchemy.orm import Session from airflow.models import DagBag from airflow.models.asset import AssetModel @@ -98,13 +99,23 @@ def clean(): clear_db_runs() +@pytest.fixture +def asset1(): + return Asset(uri="s3://bucket/next-run-asset/1", name="asset1") + + +@pytest.fixture +def asset2(): + return Asset(uri="s3://bucket/next-run-asset/2", name="asset2") + + @pytest.fixture def asset3(): return Dataset(uri="s3://dataset-bucket/example.csv") @pytest.fixture -def make_dag(dag_maker, session, time_machine, asset3): +def make_dag(dag_maker, session, time_machine, asset1, asset2, asset3): with dag_maker( dag_id=DAG_ID_EXTERNAL_TRIGGER, serialized=True, @@ -120,11 +131,7 @@ def make_dag(dag_maker, session, time_machine, asset3): serialized=True, session=session, start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), - schedule=( - Asset(uri="s3://bucket/next-run-asset/1", name="asset1") - & Asset(uri="s3://bucket/next-run-asset/2", name="asset2") - & AssetAlias("example-alias") - ), + schedule=(asset1 & asset2 & AssetAlias("example-alias")), ): ( EmptyOperator(task_id="task_1", outlets=[asset3]) @@ -135,13 +142,27 @@ def make_dag(dag_maker, session, time_machine, asset3): dag_maker.sync_dagbag_to_db() -@pytest.fixture -def asset3_id(make_dag, asset3, session) -> int: +def _fetch_asset_id(asset: Asset, session: Session) -> int: return session.scalar( - select(AssetModel.id).where(AssetModel.name == asset3.name, AssetModel.uri == asset3.uri) + select(AssetModel.id).where(AssetModel.name == asset.name, AssetModel.uri == asset.uri) ) +@pytest.fixture +def asset1_id(make_dag, asset1, session: Session) -> int: + return _fetch_asset_id(asset1, session) + + +@pytest.fixture +def asset2_id(make_dag, asset2, session) -> int: + return _fetch_asset_id(asset2, session) + + +@pytest.fixture +def asset3_id(make_dag, asset3, session) -> int: + return _fetch_asset_id(asset3, session) + + class TestStructureDataEndpoint: @pytest.mark.parametrize( "params, expected", @@ -280,7 +301,7 @@ def test_should_return_200(self, test_client, params, expected): assert response.json() == expected @pytest.mark.usefixtures("make_dag") - def test_should_return_200_with_asset(self, test_client, asset3_id): + def test_should_return_200_with_asset(self, test_client, asset1_id, asset2_id, asset3_id): params = { "dag_id": DAG_ID, "external_dependencies": True, @@ -297,14 +318,14 @@ def test_should_return_200_with_asset(self, test_client, asset3_id): { "is_setup_teardown": None, "label": None, - "source_id": "asset1", + "source_id": asset1_id, "target_id": "and-gate-0", "is_source_asset": None, }, { "is_setup_teardown": None, "label": None, - "source_id": "asset2", + "source_id": asset2_id, "target_id": "and-gate-0", "is_source_asset": None, }, @@ -431,7 +452,7 @@ def test_should_return_200_with_asset(self, test_client, asset3_id): }, { "children": None, - "id": "asset1", + "id": asset1_id, "is_mapped": None, "label": "asset1", "tooltip": None, @@ -442,7 +463,7 @@ def test_should_return_200_with_asset(self, test_client, asset3_id): }, { "children": None, - "id": "asset2", + "id": asset2_id, "is_mapped": None, "label": "asset2", "tooltip": None, From 7b2de242ef14114dbd1907db92d3fc992b4a8c93 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Mar 2025 16:13:06 +0800 Subject: [PATCH 21/21] fix(api_fastapi): fix /ui/structure --- .../core_api/routes/ui/structure.py | 12 +-- .../core_api/services/ui/structure.py | 80 ++++++++++++------- .../core_api/routes/ui/test_structure.py | 24 +++--- 3 files changed, 70 insertions(+), 46 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow/api_fastapi/core_api/routes/ui/structure.py index c21adccf4b834..05fb79bd0bf29 100644 --- a/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -130,11 +130,13 @@ def structure_data( } ) - upstream_asset_nodes, upstream_asset_edges = get_upstream_assets( - dag.timetable.asset_condition, entry_node_ref["id"] - ) + if asset_expression := serialized_dag.dag_model.asset_expression: + upstream_asset_nodes, upstream_asset_edges = get_upstream_assets( + asset_expression, entry_node_ref["id"] + ) + data["nodes"] += upstream_asset_nodes + data["edges"] = upstream_asset_edges - data["nodes"] += upstream_asset_nodes - data["edges"] = upstream_asset_edges + start_edges + edges + end_edges + data["edges"] += start_edges + edges + end_edges return StructureDataResponse(**data) diff --git a/airflow/api_fastapi/core_api/services/ui/structure.py b/airflow/api_fastapi/core_api/services/ui/structure.py index 72ee000b1f5a5..128dc93b7706d 100644 --- a/airflow/api_fastapi/core_api/services/ui/structure.py +++ b/airflow/api_fastapi/core_api/services/ui/structure.py @@ -23,37 +23,43 @@ from __future__ import annotations -from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny, BaseAsset - def get_upstream_assets( - asset_condition: BaseAsset, entry_node_ref: str, level=0 + asset_expression: dict, entry_node_ref: str, level: int = 0 ) -> tuple[list[dict], list[dict]]: edges: list[dict] = [] nodes: list[dict] = [] - asset_condition_type: str | None = None - - assets: list[Asset | AssetAlias] = [] - - nested_expression: AssetAll | AssetAny | None = None - - if isinstance(asset_condition, AssetAny): - asset_condition_type = "or-gate" - - elif isinstance(asset_condition, AssetAll): - asset_condition_type = "and-gate" - - if hasattr(asset_condition, "objects"): - for obj in asset_condition.objects: - if isinstance(obj, (AssetAll, AssetAny)): - nested_expression = obj - elif isinstance(obj, (Asset, AssetAlias)): - assets.append(obj) + asset_expression_type: str | None = None + + # include assets, asset-alias, asset-name-refs, asset-uri-refs + assets_info: list[dict] = [] + + nested_expression: dict = {} + + expr_key = "" + if asset_expression.keys() == {"any"}: + asset_expression_type = "or-gate" + expr_key = "any" + elif asset_expression.keys() == {"all"}: + asset_expression_type = "and-gate" + expr_key = "all" + + if expr_key in asset_expression: + asset_exprs: list[dict] = asset_expression[expr_key] + for expr in asset_exprs: + nested_expr_key = next(iter(expr.keys())) + if nested_expr_key in ("any", "all"): + nested_expression = expr + elif nested_expr_key in ("asset", "alias", "asset-name-ref", "asset-uri-ref"): + asset_info = expr[nested_expr_key] + asset_info["type"] = nested_expr_key if nested_expr_key != "alias" else "asset-alias" + + assets_info.append(asset_info) else: - raise TypeError(f"Unsupported type: {type(obj)}") + raise TypeError(f"Unsupported type: {expr.keys()}") - if asset_condition_type and assets: - asset_condition_id = f"{asset_condition_type}-{level}" + if asset_expression_type and assets_info: + asset_condition_id = f"{asset_expression_type}-{level}" edges.append( { "source_id": asset_condition_id, @@ -66,22 +72,36 @@ def get_upstream_assets( "id": asset_condition_id, "label": asset_condition_id, "type": "asset-condition", - "asset_condition_type": asset_condition_type, + "asset_condition_type": asset_expression_type, } ) - for asset in assets: + for asset in assets_info: + asset_type = asset["type"] + + if asset_type == "asset": + source_id = str(asset["id"]) + label = asset["name"] + elif asset_type == "asset-alias" or asset_type == "asset-name-ref": + source_id = asset["name"] + label = asset["name"] + elif asset_type == "asset-uri-ref": + source_id = asset["uri"] + label = asset["uri"] + else: + raise TypeError(f"Unsupported type: {asset_type}") + edges.append( { - "source_id": asset.name, + "source_id": source_id, "target_id": asset_condition_id, } ) nodes.append( { - "id": asset.name, - "label": asset.name, - "type": "asset-alias" if isinstance(asset, AssetAlias) else "asset", + "id": source_id, + "label": label, + "type": asset_type, } ) diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index 51370ea13db2b..3a97177460dfa 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -86,7 +86,7 @@ @pytest.fixture(autouse=True, scope="module") -def examples_dag_bag(): +def examples_dag_bag() -> DagBag: # Speed up: We don't want example dags for this module return DagBag(include_examples=False, read_dags_from_db=True) @@ -100,22 +100,22 @@ def clean(): @pytest.fixture -def asset1(): +def asset1() -> Asset: return Asset(uri="s3://bucket/next-run-asset/1", name="asset1") @pytest.fixture -def asset2(): +def asset2() -> Asset: return Asset(uri="s3://bucket/next-run-asset/2", name="asset2") @pytest.fixture -def asset3(): +def asset3() -> Dataset: return Dataset(uri="s3://dataset-bucket/example.csv") @pytest.fixture -def make_dag(dag_maker, session, time_machine, asset1, asset2, asset3): +def make_dag(dag_maker, session, time_machine, asset1: Asset, asset2: Asset, asset3: Dataset) -> None: with dag_maker( dag_id=DAG_ID_EXTERNAL_TRIGGER, serialized=True, @@ -142,24 +142,26 @@ def make_dag(dag_maker, session, time_machine, asset1, asset2, asset3): dag_maker.sync_dagbag_to_db() -def _fetch_asset_id(asset: Asset, session: Session) -> int: - return session.scalar( - select(AssetModel.id).where(AssetModel.name == asset.name, AssetModel.uri == asset.uri) +def _fetch_asset_id(asset: Asset, session: Session) -> str: + return str( + session.scalar( + select(AssetModel.id).where(AssetModel.name == asset.name, AssetModel.uri == asset.uri) + ) ) @pytest.fixture -def asset1_id(make_dag, asset1, session: Session) -> int: +def asset1_id(make_dag, asset1, session: Session) -> str: return _fetch_asset_id(asset1, session) @pytest.fixture -def asset2_id(make_dag, asset2, session) -> int: +def asset2_id(make_dag, asset2, session) -> str: return _fetch_asset_id(asset2, session) @pytest.fixture -def asset3_id(make_dag, asset3, session) -> int: +def asset3_id(make_dag, asset3, session) -> str: return _fetch_asset_id(asset3, session)