diff --git a/airflow/api_fastapi/core_api/routes/ui/dependencies.py b/airflow/api_fastapi/core_api/routes/ui/dependencies.py index 539022e7b480b..8a0d8bbacd122 100644 --- a/airflow/api_fastapi/core_api/routes/ui/dependencies.py +++ b/airflow/api_fastapi/core_api/routes/ui/dependencies.py @@ -55,7 +55,7 @@ def get_dependencies(session: SessionDep, node_id: str | None = None) -> BaseGra if dep.node_id not in nodes_dict: nodes_dict[dep.node_id] = { "id": dep.node_id, - "label": dep.dependency_id, + "label": dep.label, "type": dep.dependency_type, } diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow/api_fastapi/core_api/routes/ui/structure.py index d96e633c28117..05fb79bd0bf29 100644 --- a/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -125,16 +125,18 @@ def structure_data( nodes.append( { "id": dependency.node_id, - "label": dependency.dependency_id, + "label": dependency.label, "type": dependency.dependency_type, } ) - upstream_asset_nodes, upstream_asset_edges = get_upstream_assets( - dag.timetable.asset_condition, entry_node_ref["id"] - ) + if asset_expression := serialized_dag.dag_model.asset_expression: + upstream_asset_nodes, upstream_asset_edges = get_upstream_assets( + asset_expression, entry_node_ref["id"] + ) + data["nodes"] += upstream_asset_nodes + data["edges"] = upstream_asset_edges - data["nodes"] += upstream_asset_nodes - data["edges"] = upstream_asset_edges + start_edges + edges + end_edges + data["edges"] += start_edges + edges + end_edges return StructureDataResponse(**data) diff --git a/airflow/api_fastapi/core_api/services/ui/structure.py b/airflow/api_fastapi/core_api/services/ui/structure.py index 72ee000b1f5a5..128dc93b7706d 100644 --- a/airflow/api_fastapi/core_api/services/ui/structure.py +++ b/airflow/api_fastapi/core_api/services/ui/structure.py @@ -23,37 +23,43 @@ from __future__ import annotations -from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny, BaseAsset - def get_upstream_assets( - asset_condition: BaseAsset, entry_node_ref: str, level=0 + asset_expression: dict, entry_node_ref: str, level: int = 0 ) -> tuple[list[dict], list[dict]]: edges: list[dict] = [] nodes: list[dict] = [] - asset_condition_type: str | None = None - - assets: list[Asset | AssetAlias] = [] - - nested_expression: AssetAll | AssetAny | None = None - - if isinstance(asset_condition, AssetAny): - asset_condition_type = "or-gate" - - elif isinstance(asset_condition, AssetAll): - asset_condition_type = "and-gate" - - if hasattr(asset_condition, "objects"): - for obj in asset_condition.objects: - if isinstance(obj, (AssetAll, AssetAny)): - nested_expression = obj - elif isinstance(obj, (Asset, AssetAlias)): - assets.append(obj) + asset_expression_type: str | None = None + + # include assets, asset-alias, asset-name-refs, asset-uri-refs + assets_info: list[dict] = [] + + nested_expression: dict = {} + + expr_key = "" + if asset_expression.keys() == {"any"}: + asset_expression_type = "or-gate" + expr_key = "any" + elif asset_expression.keys() == {"all"}: + asset_expression_type = "and-gate" + expr_key = "all" + + if expr_key in asset_expression: + asset_exprs: list[dict] = asset_expression[expr_key] + for expr in asset_exprs: + nested_expr_key = next(iter(expr.keys())) + if nested_expr_key in ("any", "all"): + nested_expression = expr + elif nested_expr_key in ("asset", "alias", "asset-name-ref", "asset-uri-ref"): + asset_info = expr[nested_expr_key] + asset_info["type"] = nested_expr_key if nested_expr_key != "alias" else "asset-alias" + + assets_info.append(asset_info) else: - raise TypeError(f"Unsupported type: {type(obj)}") + raise TypeError(f"Unsupported type: {expr.keys()}") - if asset_condition_type and assets: - asset_condition_id = f"{asset_condition_type}-{level}" + if asset_expression_type and assets_info: + asset_condition_id = f"{asset_expression_type}-{level}" edges.append( { "source_id": asset_condition_id, @@ -66,22 +72,36 @@ def get_upstream_assets( "id": asset_condition_id, "label": asset_condition_id, "type": "asset-condition", - "asset_condition_type": asset_condition_type, + "asset_condition_type": asset_expression_type, } ) - for asset in assets: + for asset in assets_info: + asset_type = asset["type"] + + if asset_type == "asset": + source_id = str(asset["id"]) + label = asset["name"] + elif asset_type == "asset-alias" or asset_type == "asset-name-ref": + source_id = asset["name"] + label = asset["name"] + elif asset_type == "asset-uri-ref": + source_id = asset["uri"] + label = asset["uri"] + else: + raise TypeError(f"Unsupported type: {asset_type}") + edges.append( { - "source_id": asset.name, + "source_id": source_id, "target_id": asset_condition_id, } ) nodes.append( { - "id": asset.name, - "label": asset.name, - "type": "asset-alias" if isinstance(asset, AssetAlias) else "asset", + "id": source_id, + "label": label, + "type": asset_type, } ) diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 5fae9441052ee..b1defa8f9ddff 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -21,23 +21,28 @@ import logging import zlib -from collections.abc import Iterable +from collections.abc import Iterable, Iterator, Sequence from datetime import timedelta from typing import TYPE_CHECKING, Any import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select +from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, tuple_ from sqlalchemy.orm import backref, foreign, relationship from sqlalchemy.sql.expression import func, literal from sqlalchemy_utils import UUIDType from airflow.exceptions import TaskNotFound +from airflow.models.asset import ( + AssetAliasModel, + AssetModel, +) from airflow.models.base import ID_LEN, Base from airflow.models.dag import DagModel from airflow.models.dag_version import DagVersion from airflow.models.dagcode import DagCode from airflow.models.dagrun import DagRun +from airflow.sdk.definitions.asset import AssetUniqueKey from airflow.serialization.dag_dependency import DagDependency from airflow.serialization.serialized_objects import SerializedDAG from airflow.settings import COMPRESS_SERIALIZED_DAGS, MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json @@ -58,6 +63,187 @@ log = logging.getLogger(__name__) +class _DagDependenciesResolver: + """Resolver that resolves dag dependencies to include asset id and assets link to asset aliases.""" + + def __init__(self, dag_id_dependencies: Sequence[tuple[str, dict]], session: Session) -> None: + self.dag_id_dependencies = dag_id_dependencies + self.session = session + + self.asset_key_to_id: dict[AssetUniqueKey, int] = {} + self.asset_ref_name_to_asset_id_name: dict[str, tuple[int, str]] = {} + self.asset_ref_uri_to_asset_id_name: dict[str, tuple[int, str]] = {} + self.alias_names_to_asset_ids_names: dict[str, list[tuple[int, str]]] = {} + + def resolve(self) -> dict[str, list[DagDependency]]: + asset_names_uris, asset_ref_names, asset_ref_uris, asset_alias_names = self.collect_asset_info() + + self.asset_key_to_id = self.collect_asset_key_to_ids(asset_names_uris) + self.asset_ref_name_to_asset_id_name = self.collect_asset_name_ref_to_ids_names(asset_ref_names) + self.asset_ref_uri_to_asset_id_name = self.collect_asset_uri_ref_to_ids_names(asset_ref_uris) + self.alias_names_to_asset_ids_names = self.collect_alias_to_assets(asset_alias_names) + + dag_depdendencies_by_dag: dict[str, list[DagDependency]] = {} + for dag_id, deps_data in self.dag_id_dependencies: + dag_deps: list[DagDependency] = [] + for dep_data in deps_data or {}: + dep_type = dep_data["dependency_type"] + if dep_type == "asset": + dag_deps.append(self.resolve_asset_dag_dep(dep_data)) + elif dep_type == "asset-name-ref": + dag_deps.extend(self.resolve_asset_name_ref_dag_dep(dep_data)) + elif dep_type == "asset-uri-ref": + dag_deps.extend(self.resolve_asset_uri_ref_dag_dep(dep_data)) + elif dep_type == "asset-alias": + dag_deps.extend(self.resolve_asset_alias_dag_dep(dep_data)) + else: + # Replace asset_key with asset id if it's in source or target + for node_key in ("source", "target"): + if dep_data[node_key].startswith("asset:"): + unique_key = AssetUniqueKey.from_str(dep_data[node_key].split(":")[1]) + asset_id = self.asset_key_to_id[unique_key] + dep_data[node_key] = f"asset:{asset_id}" + break + + dag_deps.append(DagDependency(**dep_data)) + + dag_depdendencies_by_dag[dag_id] = dag_deps + return dag_depdendencies_by_dag + + def collect_asset_info(self) -> tuple[set, set, set, set]: + asset_names_uris: set[tuple[str, str]] = set() + asset_ref_names: set[str] = set() + asset_ref_uris: set[str] = set() + asset_alias_names: set[str] = set() + for _, deps_data in self.dag_id_dependencies: + for dep_data in deps_data or {}: + dep_type = dep_data["dependency_type"] + dep_id = dep_data["dependency_id"] + if dep_type == "asset": + unique_key = AssetUniqueKey.from_str(dep_id) + asset_names_uris.add((unique_key.name, unique_key.uri)) + elif dep_type == "asset-name-ref": + asset_ref_names.add(dep_id) + elif dep_type == "asset-uri-ref": + asset_ref_uris.add(dep_id) + elif dep_type == "asset-alias": + asset_alias_names.add(dep_id) + return asset_names_uris, asset_ref_names, asset_ref_uris, asset_alias_names + + def collect_asset_key_to_ids(self, asset_name_uris: set[tuple[str, str]]) -> dict[AssetUniqueKey, int]: + return { + AssetUniqueKey(name=name, uri=uri): asset_id + for name, uri, asset_id in self.session.execute( + select(AssetModel.name, AssetModel.uri, AssetModel.id).where( + tuple_(AssetModel.name, AssetModel.uri).in_(asset_name_uris) + ) + ) + } + + def collect_asset_name_ref_to_ids_names(self, asset_ref_names) -> dict[str, tuple[int, str]]: + return { + name: (asset_id, name) + for name, asset_id in self.session.execute( + select(AssetModel.name, AssetModel.id).where( + AssetModel.name.in_(asset_ref_names), AssetModel.active.has() + ) + ) + } + + def collect_asset_uri_ref_to_ids_names(self, asset_ref_uris) -> dict[str, tuple[int, str]]: + return { + uri: (asset_id, name) + for uri, name, asset_id in self.session.execute( + select(AssetModel.uri, AssetModel.name, AssetModel.id).where( + AssetModel.uri.in_(asset_ref_uris), AssetModel.active.has() + ) + ) + } + + def collect_alias_to_assets(self, asset_alias_names) -> dict[str, list[tuple[int, str]]]: + return { + aam.name: [(am.id, am.name) for am in aam.assets] + for aam in self.session.scalars( + select(AssetAliasModel).where(AssetAliasModel.name.in_(asset_alias_names)) + ) + } + + def resolve_asset_dag_dep(self, dep_data: dict) -> DagDependency: + dep_id = dep_data["dependency_id"] + unique_key = AssetUniqueKey.from_str(dep_id) + dep_data["dependency_id"] = str(self.asset_key_to_id[unique_key]) + return DagDependency(**dep_data) + + def resolve_asset_name_ref_dag_dep(self, dep_data) -> Sequence[DagDependency]: + dep_id = dep_data["dependency_id"] + is_source_ref = dep_data["source"] == "asest-name-ref" + asset_id, asset_name = self.asset_ref_name_to_asset_id_name[dep_id] + return [ + # asset + DagDependency( + source="asset" if is_source_ref else f"asset-name-ref:{dep_id}", + target=f"asset-name-ref:{dep_id}" if is_source_ref else "asset", + label=asset_name, + dependency_type="asset", + dependency_id=str(asset_id), + ), + # asset ref + DagDependency( + source=f"asset:{asset_id}" if is_source_ref else dep_data["source"], + target=dep_data["target"] if is_source_ref else f"asset:{asset_id}", + label=dep_id, + dependency_type="asset-name-ref", + dependency_id=dep_id, + ), + ] + + def resolve_asset_uri_ref_dag_dep(self, dep_data: dict) -> Sequence[DagDependency]: + dep_id = dep_data["dependency_id"] + is_source_ref = dep_data["source"] == "asest-uri-ref" + asset_id, asset_name = self.asset_ref_uri_to_asset_id_name[dep_id] + return [ + # asset + DagDependency( + source="asset" if is_source_ref else f"asset-uri-ref:{dep_id}", + target=f"asset-uri-ref:{dep_id}" if is_source_ref else "asset", + label=asset_name, + dependency_type="asset", + dependency_id=str(asset_id), + ), + # asset ref + DagDependency( + source=f"asset:{asset_id}" if is_source_ref else dep_data["source"], + target=dep_data["target"] if is_source_ref else f"asset:{asset_id}", + label=dep_id, + dependency_type="asset-uri-ref", + dependency_id=dep_id, + ), + ] + + def resolve_asset_alias_dag_dep(self, dep_data: dict) -> Iterator[DagDependency]: + dep_id = dep_data["dependency_id"] + for asset_id, asset_name in self.alias_names_to_asset_ids_names[dep_id]: + is_source_alias = dep_data["source"] == "asset-alias" + yield from [ + # asset + DagDependency( + source="asset" if is_source_alias else f"asset-alias:{dep_id}", + target=f"asset-alias:{dep_id}" if is_source_alias else "asset", + label=asset_name, + dependency_type="asset", + dependency_id=str(asset_id), + ), + # asset alias + DagDependency( + source=f"asset:{asset_id}" if is_source_alias else dep_data["source"], + target=dep_data["target"] if is_source_alias else f"asset:{asset_id}", + label=dep_id, + dependency_type="asset-alias", + dependency_id=dep_id, + ), + ] + + class SerializedDagModel(Base): """ A table for serialized DAGs. @@ -477,7 +663,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ .join(cls.dag_model) .where(DagModel.is_active) ) - iterator = ((dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query) + iterator = [(dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query] else: iterator = session.execute( select(cls.dag_id, func.json_extract_path(cls._data, "dag", "dag_dependencies")) @@ -488,8 +674,12 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ ) .join(cls.dag_model) .where(DagModel.is_active) - ) - return {dag_id: [DagDependency(**d) for d in (deps_data or [])] for dag_id, deps_data in iterator} + ).all() + + resolver = _DagDependenciesResolver(dag_id_dependencies=iterator, session=session) + dag_depdendencies_by_dag = resolver.resolve() + + return dag_depdendencies_by_dag @staticmethod @provide_session diff --git a/airflow/serialization/dag_dependency.py b/airflow/serialization/dag_dependency.py index bede95ba9235b..745a487eed42a 100644 --- a/airflow/serialization/dag_dependency.py +++ b/airflow/serialization/dag_dependency.py @@ -17,6 +17,7 @@ from __future__ import annotations from dataclasses import dataclass +from typing import Literal @dataclass(frozen=True, order=True) @@ -29,7 +30,8 @@ class DagDependency: source: str target: str - dependency_type: str + label: str + dependency_type: Literal["asset", "asset-name-ref", "asset-uri-ref", "asset-alias", "trigger", "sensor"] dependency_id: str | None = None @property diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 7113118d14b62..b01117b9ec61a 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1069,6 +1069,7 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: DagDependency( source=task.dag_id, target=getattr(task, "trigger_dag_id"), + label=task.task_display_name, dependency_type="trigger", dependency_id=task.task_id, ) @@ -1082,6 +1083,7 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: DagDependency( source=task.dag_id, target=task.partial_kwargs["trigger_dag_id"], + label=task.task_display_name, dependency_type="trigger", dependency_id=task.task_id, ) @@ -1091,6 +1093,7 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: DagDependency( source=getattr(task, "external_dag_id"), target=task.dag_id, + label=task.task_display_name, dependency_type="sensor", dependency_id=task.task_id, ) @@ -1104,22 +1107,26 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: DagDependency( source=task.partial_kwargs["external_dag_id"], target=task.dag_id, + label=task.task_display_name, dependency_type="sensor", dependency_id=task.task_id, ) ) + for obj in task.outlets or []: if isinstance(obj, Asset): deps.append( DagDependency( source=task.dag_id, target="asset", + label=obj.name, dependency_type="asset", - dependency_id=obj.name, + dependency_id=AssetUniqueKey.from_asset(obj).to_str(), ) ) elif isinstance(obj, AssetAlias): deps.extend(obj.iter_dag_dependencies(source=task.dag_id, target="")) + return deps @staticmethod diff --git a/airflow/ui/src/pages/Asset/AssetGraph.tsx b/airflow/ui/src/pages/Asset/AssetGraph.tsx index 1fa904017eca0..72d52fbc10a42 100644 --- a/airflow/ui/src/pages/Asset/AssetGraph.tsx +++ b/airflow/ui/src/pages/Asset/AssetGraph.tsx @@ -41,7 +41,7 @@ export const AssetGraph = ({ asset }: { readonly asset?: AssetResponse }) => { const { colorMode = "light" } = useColorMode(); const { data = { edges: [], nodes: [] } } = useDependenciesServiceGetDependencies( - { nodeId: `asset:${asset?.name}` }, + { nodeId: `asset:${asset?.id}` }, undefined, { enabled: Boolean(asset) && Boolean(asset?.name) }, ); diff --git a/airflow/utils/dot_renderer.py b/airflow/utils/dot_renderer.py index 24ee60a5f5709..fc1685b68b61f 100644 --- a/airflow/utils/dot_renderer.py +++ b/airflow/utils/dot_renderer.py @@ -179,8 +179,12 @@ def render_dag_dependencies(deps: dict[str, list[DagDependency]]) -> graphviz.Di "label": dag, }, ) as dep_subgraph: - dep_subgraph.edge(dep.source, dep.dependency_id) - dep_subgraph.edge(dep.dependency_id, dep.target) + leaf_nodes = ("asset", "asset-name-ref", "asset-uri-ref", "asset-alias") + if dep.source not in leaf_nodes: + dep_subgraph.edge(dep.source, dep.dependency_id) + + if dep.target not in leaf_nodes: + dep_subgraph.edge(dep.dependency_id, dep.target) return dot diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index 41e8cfd637df4..43f0d7c2c1359 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -17,12 +17,13 @@ from __future__ import annotations +import json import logging import operator import os import urllib.parse import warnings -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Union, overload +from typing import TYPE_CHECKING, Any, Callable, ClassVar, Literal, Union, overload import attrs @@ -77,6 +78,13 @@ def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: def to_asset(self) -> Asset: return Asset(name=self.name, uri=self.uri) + @staticmethod + def from_str(key: str) -> AssetUniqueKey: + return AssetUniqueKey(**json.loads(key)) + + def to_str(self) -> str: + return json.dumps(attrs.asdict(self)) + @attrs.define(frozen=True) class AssetAliasUniqueKey: @@ -445,8 +453,11 @@ def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDepe yield DagDependency( source=source or "asset", target=target or "asset", + label=self.name, dependency_type="asset", - dependency_id=self.name, + # We can't get asset id at this stage. + # This will be updated when running SerializedDagModel.get_dag_dependencies + dependency_id=AssetUniqueKey.from_asset(self).to_str(), ) def asprofile(self) -> AssetProfile: @@ -468,6 +479,8 @@ class AssetRef(BaseAsset, AttrsInstance): :meta private: """ + _dependency_type: Literal["asset-name-ref", "asset-uri-ref"] + def as_expression(self) -> Any: return {"asset_ref": attrs.asdict(self)} @@ -483,9 +496,10 @@ def iter_asset_refs(self) -> Iterator[AssetRef]: def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterator[DagDependency]: (dependency_id,) = attrs.astuple(self) yield DagDependency( - source=source or "asset-ref", - target=target or "asset-ref", - dependency_type="asset-ref", + source=source or self._dependency_type, + target=target or self._dependency_type, + label=dependency_id, + dependency_type=self._dependency_type, dependency_id=dependency_id, ) @@ -496,6 +510,8 @@ class AssetNameRef(AssetRef): name: str + _dependency_type = "asset-name-ref" + @attrs.define(hash=True) class AssetUriRef(AssetRef): @@ -503,6 +519,8 @@ class AssetUriRef(AssetRef): uri: str + _dependency_type = "asset-uri-ref" + class Dataset(Asset): """A representation of dataset dependencies between workflows.""" @@ -549,6 +567,7 @@ def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterat yield DagDependency( source=source or "asset-alias", target=target or "asset-alias", + label=self.name, dependency_type="asset-alias", dependency_id=self.name, ) diff --git a/tests/api_fastapi/core_api/routes/ui/test_dependencies.py b/tests/api_fastapi/core_api/routes/ui/test_dependencies.py index efa9ff99b11c4..4855961195b10 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_dependencies.py +++ b/tests/api_fastapi/core_api/routes/ui/test_dependencies.py @@ -18,7 +18,9 @@ import pendulum import pytest +from sqlalchemy import select +from airflow.models.asset import AssetModel from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.sensors.external_task import ExternalTaskSensor @@ -36,7 +38,12 @@ def cleanup(): @pytest.fixture -def make_primary_connected_component(dag_maker): +def asset1() -> Asset: + return Asset(uri="s3://bucket/next-run-asset/1", name="asset1") + + +@pytest.fixture +def make_primary_connected_component(dag_maker, asset1): with dag_maker( dag_id="external_trigger_dag_id", serialized=True, @@ -47,13 +54,12 @@ def make_primary_connected_component(dag_maker): with dag_maker(dag_id="other_dag", serialized=True): EmptyOperator(task_id="task1") - asset = Asset(uri="s3://bucket/next-run-asset/1", name="asset1") with dag_maker(dag_id="upstream", serialized=True): - EmptyOperator(task_id="task2", outlets=[asset]) + EmptyOperator(task_id="task2", outlets=[asset1]) with dag_maker( dag_id="downstream", - schedule=[asset], + schedule=[asset1], serialized=True, ): EmptyOperator(task_id="task1") >> ExternalTaskSensor( @@ -64,11 +70,18 @@ def make_primary_connected_component(dag_maker): @pytest.fixture -def expected_primary_component_response(): +def asset1_id(make_primary_connected_component, asset1, session) -> int: + return session.scalar( + select(AssetModel.id).where(AssetModel.name == asset1.name, AssetModel.uri == asset1.uri) + ) + + +@pytest.fixture +def expected_primary_component_response(asset1_id): return { "edges": [ { - "source_id": "asset:asset1", + "source_id": f"asset:{asset1_id}", "target_id": "dag:downstream", }, { @@ -81,7 +94,7 @@ def expected_primary_component_response(): }, { "source_id": "dag:upstream", - "target_id": "asset:asset1", + "target_id": f"asset:{asset1_id}", }, { "source_id": "sensor:other_dag:downstream:external_task_sensor", @@ -99,7 +112,7 @@ def expected_primary_component_response(): "type": "dag", }, { - "id": "asset:asset1", + "id": f"asset:{asset1_id}", "label": "asset1", "type": "asset", }, @@ -128,15 +141,18 @@ def expected_primary_component_response(): @pytest.fixture -def make_secondary_connected_component(dag_maker): - asset = Asset(uri="s3://bucket/next-run-asset/2", name="asset2") +def asset2() -> Asset: + return Asset(uri="s3://bucket/next-run-asset/2", name="asset2") + +@pytest.fixture +def make_secondary_connected_component(dag_maker, asset2): with dag_maker(dag_id="upstream_secondary", serialized=True): - EmptyOperator(task_id="task2", outlets=[asset]) + EmptyOperator(task_id="task2", outlets=[asset2]) with dag_maker( dag_id="downstream_secondary", - schedule=[asset], + schedule=[asset2], serialized=True, ): EmptyOperator(task_id="task1") @@ -145,16 +161,23 @@ def make_secondary_connected_component(dag_maker): @pytest.fixture -def expected_secondary_component_response(): +def asset2_id(make_secondary_connected_component, asset2, session) -> int: + return session.scalar( + select(AssetModel.id).where(AssetModel.name == asset2.name, AssetModel.uri == asset2.uri) + ) + + +@pytest.fixture +def expected_secondary_component_response(asset2_id): return { "edges": [ { - "source_id": "asset:asset2", + "source_id": f"asset:{asset2_id}", "target_id": "dag:downstream_secondary", }, { "source_id": "dag:upstream_secondary", - "target_id": "asset:asset2", + "target_id": f"asset:{asset2_id}", }, ], "nodes": [ @@ -164,7 +187,7 @@ def expected_secondary_component_response(): "type": "dag", }, { - "id": "asset:asset2", + "id": f"asset:{asset2_id}", "label": "asset2", "type": "asset", }, @@ -199,7 +222,6 @@ def test_delete_dag_should_response_403(self, unauthorized_test_client): "node_id, expected_response_fixture", [ # Primary Component - ("asset:asset1", "expected_primary_component_response"), ("dag:downstream", "expected_primary_component_response"), ("sensor:other_dag:downstream:external_task_sensor", "expected_primary_component_response"), ("dag:external_trigger_dag_id", "expected_primary_component_response"), @@ -209,7 +231,6 @@ def test_delete_dag_should_response_403(self, unauthorized_test_client): ), ("dag:upstream", "expected_primary_component_response"), # Secondary Component - ("asset:asset2", "expected_secondary_component_response"), ("dag:downstream_secondary", "expected_secondary_component_response"), ("dag:upstream_secondary", "expected_secondary_component_response"), ], @@ -222,6 +243,23 @@ def test_with_node_id_filter(self, test_client, node_id, expected_response_fixtu assert response.json() == expected_response + def test_with_node_id_filter_with_asset( + self, + test_client, + asset1_id, + asset2_id, + expected_primary_component_response, + expected_secondary_component_response, + ): + for asset_id, expected_response in ( + (asset1_id, expected_primary_component_response), + (asset2_id, expected_secondary_component_response), + ): + response = test_client.get("/ui/dependencies", params={"node_id": f"asset:{asset_id}"}) + assert response.status_code == 200 + + assert response.json() == expected_response + @pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component") def test_with_node_id_filter_not_found(self, test_client): response = test_client.get("/ui/dependencies", params={"node_id": "missing_node_id"}) diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index e3512f6b02698..3a97177460dfa 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -21,8 +21,11 @@ import pendulum import pytest +from sqlalchemy import select +from sqlalchemy.orm import Session from airflow.models import DagBag +from airflow.models.asset import AssetModel from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.sensors.external_task import ExternalTaskSensor @@ -83,7 +86,7 @@ @pytest.fixture(autouse=True, scope="module") -def examples_dag_bag(): +def examples_dag_bag() -> DagBag: # Speed up: We don't want example dags for this module return DagBag(include_examples=False, read_dags_from_db=True) @@ -97,7 +100,22 @@ def clean(): @pytest.fixture -def make_dag(dag_maker, session, time_machine): +def asset1() -> Asset: + return Asset(uri="s3://bucket/next-run-asset/1", name="asset1") + + +@pytest.fixture +def asset2() -> Asset: + return Asset(uri="s3://bucket/next-run-asset/2", name="asset2") + + +@pytest.fixture +def asset3() -> Dataset: + return Dataset(uri="s3://dataset-bucket/example.csv") + + +@pytest.fixture +def make_dag(dag_maker, session, time_machine, asset1: Asset, asset2: Asset, asset3: Dataset) -> None: with dag_maker( dag_id=DAG_ID_EXTERNAL_TRIGGER, serialized=True, @@ -113,14 +131,10 @@ def make_dag(dag_maker, session, time_machine): serialized=True, session=session, start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), - schedule=( - Asset(uri="s3://bucket/next-run-asset/1", name="asset1") - & Asset(uri="s3://bucket/next-run-asset/2", name="asset2") - & AssetAlias("example-alias") - ), + schedule=(asset1 & asset2 & AssetAlias("example-alias")), ): ( - EmptyOperator(task_id="task_1", outlets=[Dataset(uri="s3://dataset-bucket/example.csv")]) + EmptyOperator(task_id="task_1", outlets=[asset3]) >> ExternalTaskSensor(task_id="external_task_sensor", external_dag_id=DAG_ID) >> EmptyOperator(task_id="task_2") ) @@ -128,6 +142,29 @@ def make_dag(dag_maker, session, time_machine): dag_maker.sync_dagbag_to_db() +def _fetch_asset_id(asset: Asset, session: Session) -> str: + return str( + session.scalar( + select(AssetModel.id).where(AssetModel.name == asset.name, AssetModel.uri == asset.uri) + ) + ) + + +@pytest.fixture +def asset1_id(make_dag, asset1, session: Session) -> str: + return _fetch_asset_id(asset1, session) + + +@pytest.fixture +def asset2_id(make_dag, asset2, session) -> str: + return _fetch_asset_id(asset2, session) + + +@pytest.fixture +def asset3_id(make_dag, asset3, session) -> str: + return _fetch_asset_id(asset3, session) + + class TestStructureDataEndpoint: @pytest.mark.parametrize( "params, expected", @@ -219,191 +256,6 @@ class TestStructureDataEndpoint: ], }, ), - ( - { - "dag_id": DAG_ID, - "external_dependencies": True, - }, - { - "edges": [ - { - "is_setup_teardown": None, - "label": None, - "source_id": "and-gate-0", - "target_id": "task_1", - "is_source_asset": True, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "asset1", - "target_id": "and-gate-0", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "asset2", - "target_id": "and-gate-0", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "example-alias", - "target_id": "and-gate-0", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "sensor:dag_with_multiple_versions:dag_with_multiple_versions:external_task_sensor", - "target_id": "task_1", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "trigger:external_trigger:dag_with_multiple_versions:trigger_dag_run_operator", - "target_id": "task_1", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "external_task_sensor", - "target_id": "task_2", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "task_1", - "target_id": "external_task_sensor", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "task_2", - "target_id": "asset:s3://dataset-bucket/example.csv", - "is_source_asset": None, - }, - ], - "nodes": [ - { - "children": None, - "id": "task_1", - "is_mapped": None, - "label": "task_1", - "tooltip": None, - "setup_teardown_type": None, - "type": "task", - "operator": "EmptyOperator", - "asset_condition_type": None, - }, - { - "children": None, - "id": "external_task_sensor", - "is_mapped": None, - "label": "external_task_sensor", - "tooltip": None, - "setup_teardown_type": None, - "type": "task", - "operator": "ExternalTaskSensor", - "asset_condition_type": None, - }, - { - "children": None, - "id": "task_2", - "is_mapped": None, - "label": "task_2", - "tooltip": None, - "setup_teardown_type": None, - "type": "task", - "operator": "EmptyOperator", - "asset_condition_type": None, - }, - { - "children": None, - "id": "asset:s3://dataset-bucket/example.csv", - "is_mapped": None, - "label": "s3://dataset-bucket/example.csv", - "tooltip": None, - "setup_teardown_type": None, - "type": "asset", - "operator": None, - "asset_condition_type": None, - }, - { - "children": None, - "id": "sensor:dag_with_multiple_versions:dag_with_multiple_versions:external_task_sensor", - "is_mapped": None, - "label": "external_task_sensor", - "tooltip": None, - "setup_teardown_type": None, - "type": "sensor", - "operator": None, - "asset_condition_type": None, - }, - { - "children": None, - "id": "trigger:external_trigger:dag_with_multiple_versions:trigger_dag_run_operator", - "is_mapped": None, - "label": "trigger_dag_run_operator", - "tooltip": None, - "setup_teardown_type": None, - "type": "trigger", - "operator": None, - "asset_condition_type": None, - }, - { - "children": None, - "id": "and-gate-0", - "is_mapped": None, - "label": "and-gate-0", - "tooltip": None, - "setup_teardown_type": None, - "type": "asset-condition", - "operator": None, - "asset_condition_type": "and-gate", - }, - { - "children": None, - "id": "asset1", - "is_mapped": None, - "label": "asset1", - "tooltip": None, - "setup_teardown_type": None, - "type": "asset", - "operator": None, - "asset_condition_type": None, - }, - { - "children": None, - "id": "asset2", - "is_mapped": None, - "label": "asset2", - "tooltip": None, - "setup_teardown_type": None, - "type": "asset", - "operator": None, - "asset_condition_type": None, - }, - { - "children": None, - "id": "example-alias", - "is_mapped": None, - "label": "example-alias", - "tooltip": None, - "setup_teardown_type": None, - "type": "asset-alias", - "operator": None, - "asset_condition_type": None, - }, - ], - }, - ), ( {"dag_id": DAG_ID_EXTERNAL_TRIGGER, "external_dependencies": True}, { @@ -450,6 +302,196 @@ def test_should_return_200(self, test_client, params, expected): assert response.status_code == 200 assert response.json() == expected + @pytest.mark.usefixtures("make_dag") + def test_should_return_200_with_asset(self, test_client, asset1_id, asset2_id, asset3_id): + params = { + "dag_id": DAG_ID, + "external_dependencies": True, + } + expected = { + "edges": [ + { + "is_setup_teardown": None, + "label": None, + "source_id": "and-gate-0", + "target_id": "task_1", + "is_source_asset": True, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": asset1_id, + "target_id": "and-gate-0", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": asset2_id, + "target_id": "and-gate-0", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "example-alias", + "target_id": "and-gate-0", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "sensor:dag_with_multiple_versions:dag_with_multiple_versions:external_task_sensor", + "target_id": "task_1", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "trigger:external_trigger:dag_with_multiple_versions:trigger_dag_run_operator", + "target_id": "task_1", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "external_task_sensor", + "target_id": "task_2", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_1", + "target_id": "external_task_sensor", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_2", + "target_id": f"asset:{asset3_id}", + "is_source_asset": None, + }, + ], + "nodes": [ + { + "children": None, + "id": "task_1", + "is_mapped": None, + "label": "task_1", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "EmptyOperator", + "asset_condition_type": None, + }, + { + "children": None, + "id": "external_task_sensor", + "is_mapped": None, + "label": "external_task_sensor", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "ExternalTaskSensor", + "asset_condition_type": None, + }, + { + "children": None, + "id": "task_2", + "is_mapped": None, + "label": "task_2", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "EmptyOperator", + "asset_condition_type": None, + }, + { + "children": None, + "id": f"asset:{asset3_id}", + "is_mapped": None, + "label": "s3://dataset-bucket/example.csv", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset", + "operator": None, + "asset_condition_type": None, + }, + { + "children": None, + "id": "sensor:dag_with_multiple_versions:dag_with_multiple_versions:external_task_sensor", + "is_mapped": None, + "label": "external_task_sensor", + "tooltip": None, + "setup_teardown_type": None, + "type": "sensor", + "operator": None, + "asset_condition_type": None, + }, + { + "children": None, + "id": "trigger:external_trigger:dag_with_multiple_versions:trigger_dag_run_operator", + "is_mapped": None, + "label": "trigger_dag_run_operator", + "tooltip": None, + "setup_teardown_type": None, + "type": "trigger", + "operator": None, + "asset_condition_type": None, + }, + { + "children": None, + "id": "and-gate-0", + "is_mapped": None, + "label": "and-gate-0", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset-condition", + "operator": None, + "asset_condition_type": "and-gate", + }, + { + "children": None, + "id": asset1_id, + "is_mapped": None, + "label": "asset1", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset", + "operator": None, + "asset_condition_type": None, + }, + { + "children": None, + "id": asset2_id, + "is_mapped": None, + "label": "asset2", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset", + "operator": None, + "asset_condition_type": None, + }, + { + "children": None, + "id": "example-alias", + "is_mapped": None, + "label": "example-alias", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset-alias", + "operator": None, + "asset_condition_type": None, + }, + ], + } + + response = test_client.get("/ui/structure/structure_data", params=params) + assert response.status_code == 200 + assert response.json() == expected + @pytest.mark.parametrize( "params, expected", [ diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index f3fbe1ac93494..84fee3cb0a362 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -27,6 +27,7 @@ import airflow.example_dags as example_dags_module from airflow.decorators import task as task_decorator +from airflow.models.asset import AssetModel from airflow.models.dag import DAG, DagModel from airflow.models.dag_version import DagVersion from airflow.models.dagbag import DagBag @@ -233,12 +234,19 @@ def test_order_of_dag_params_is_stable(self): assert before == after - def test_order_of_deps_is_consistent(self): + @pytest.mark.db_test + def test_order_of_deps_is_consistent(self, session): """ Previously the 'dag_dependencies' node in serialized dag was converted to list from set. This caused the order, and thus the hash value, to be unreliable, which could produce excessive dag parsing. """ + + db.clear_db_assets() + session.add_all([AssetModel(id=i, uri=f"test://asset{i}/", name=f"{i}") for i in range(1, 6)]) + session.add_all([AssetModel(id=i, uri=f"test://asset{i}/", name=f"{i}*") for i in (0, 6)]) + session.commit() + first_dag_hash = None for _ in range(10): with DAG( @@ -257,7 +265,7 @@ def test_order_of_deps_is_consistent(self): outlets=[Asset(uri="test://asset0", name="0*"), Asset(uri="test://asset6", name="6*")], bash_command="sleep 5", ) - deps_order = [x["dependency_id"] for x in SerializedDAG.serialize_dag(dag6)["dag_dependencies"]] + deps_order = [x["label"] for x in SerializedDAG.serialize_dag(dag6)["dag_dependencies"]] # in below assert, 0 and 6 both come at end because "source" is different for them and source # is the first field in DagDependency class assert deps_order == ["1", "2", "3", "4", "5", "0*", "6*"] @@ -272,6 +280,7 @@ def test_order_of_deps_is_consistent(self): # dag hash should not change without change in structure (we're in a loop) assert this_dag_hash == first_dag_hash + db.clear_db_assets() def test_example_dag_hashes_are_always_consistent(self, session): """ diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index d69ab7d0b817c..ede6fe277cfd0 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -56,6 +56,7 @@ SerializationError, ) from airflow.hooks.base import BaseHook +from airflow.models.asset import AssetModel from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection from airflow.models.dag import DAG @@ -66,7 +67,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.sensors.bash import BashSensor -from airflow.sdk.definitions.asset import Asset +from airflow.sdk.definitions.asset import Asset, AssetUniqueKey from airflow.sdk.definitions.param import Param, ParamsDict from airflow.security import permissions from airflow.serialization.enums import Encoding @@ -267,6 +268,20 @@ } +@pytest.fixture +def testing_assets(session): + from tests_common.test_utils.db import clear_db_assets + + assets = [Asset(name=f"asset{i}", uri=f"test://asset{i}/") for i in range(1, 5)] + + session.add_all([AssetModel(id=i, name=f"asset{i}", uri=f"test://asset{i}/") for i in range(1, 5)]) + session.commit() + + yield assets + + clear_db_assets() + + def make_simple_dag(): """Make very simple DAG to verify serialization result.""" with DAG( @@ -1666,26 +1681,21 @@ class DerivedSensor(ExternalTaskSensor): { "source": "external_dag_id", "target": "test_derived_dag_deps_sensor", + "label": "task1", "dependency_type": "sensor", "dependency_id": "task1", } ] @pytest.mark.db_test - def test_dag_deps_assets_with_duplicate_asset(self): + def test_dag_deps_assets_with_duplicate_asset(self, testing_assets): """ Check that dag_dependencies node is populated correctly for a DAG with duplicate assets. """ from airflow.providers.standard.sensors.external_task import ExternalTaskSensor - asset1 = Asset(name="asset1", uri="test://asset1") - asset2 = Asset(name="asset2", uri="test://asset2") - asset3 = Asset(name="asset3", uri="test://asset3") - asset4 = Asset(name="asset4", uri="test://asset4") logical_date = datetime(2020, 1, 1) - with DAG( - dag_id="test", start_date=logical_date, schedule=[asset1, asset1, asset1, asset1, asset1] - ) as dag: + with DAG(dag_id="test", start_date=logical_date, schedule=[testing_assets[0]] * 5) as dag: ExternalTaskSensor( task_id="task1", external_dag_id="external_dag_id", @@ -1694,15 +1704,17 @@ def test_dag_deps_assets_with_duplicate_asset(self): BashOperator( task_id="asset_writer", bash_command="echo hello", - outlets=[asset2, asset2, asset2, asset3], + outlets=[testing_assets[1]] * 3 + testing_assets[2:3], ) - @dag.task(outlets=[asset4]) + @dag.task(outlets=[testing_assets[3]]) def other_asset_writer(x): pass other_asset_writer.expand(x=[1, 2]) + testing_asset_key_strs = [AssetUniqueKey.from_asset(asset).to_str() for asset in testing_assets] + dag = SerializedDAG.to_dict(dag) actual = sorted(dag["dag"]["dag_dependencies"], key=lambda x: tuple(x.values())) expected = sorted( @@ -1710,56 +1722,65 @@ def other_asset_writer(x): { "source": "test", "target": "asset", + "label": "asset4", "dependency_type": "asset", - "dependency_id": "asset4", + "dependency_id": testing_asset_key_strs[3], }, { "source": "external_dag_id", "target": "test", + "label": "task1", "dependency_type": "sensor", "dependency_id": "task1", }, { "source": "test", "target": "asset", + "label": "asset3", "dependency_type": "asset", - "dependency_id": "asset3", + "dependency_id": testing_asset_key_strs[2], }, { "source": "test", "target": "asset", + "label": "asset2", "dependency_type": "asset", - "dependency_id": "asset2", + "dependency_id": testing_asset_key_strs[1], }, { "source": "asset", "target": "test", + "label": "asset1", "dependency_type": "asset", - "dependency_id": "asset1", + "dependency_id": testing_asset_key_strs[0], }, { - "dependency_id": "asset1", - "dependency_type": "asset", "source": "asset", "target": "test", + "label": "asset1", + "dependency_type": "asset", + "dependency_id": testing_asset_key_strs[0], }, { - "dependency_id": "asset1", - "dependency_type": "asset", "source": "asset", "target": "test", + "label": "asset1", + "dependency_type": "asset", + "dependency_id": testing_asset_key_strs[0], }, { - "dependency_id": "asset1", - "dependency_type": "asset", "source": "asset", "target": "test", + "label": "asset1", + "dependency_type": "asset", + "dependency_id": testing_asset_key_strs[0], }, { - "dependency_id": "asset1", - "dependency_type": "asset", "source": "asset", "target": "test", + "label": "asset1", + "dependency_type": "asset", + "dependency_id": testing_asset_key_strs[0], }, ], key=lambda x: tuple(x.values()), @@ -1767,31 +1788,32 @@ def other_asset_writer(x): assert actual == expected @pytest.mark.db_test - def test_dag_deps_assets(self): + def test_dag_deps_assets(self, testing_assets): """ Check that dag_dependencies node is populated correctly for a DAG with assets. + + Note that asset id will not be stored at this stage and will be later evaluated when + calling SerializedDagModel.get_dag_dependencies. """ from airflow.providers.standard.sensors.external_task import ExternalTaskSensor - asset1 = Asset(name="asset1", uri="test://asset1") - asset2 = Asset(name="asset2", uri="test://asset2") - asset3 = Asset(name="asset3", uri="test://asset3") - asset4 = Asset(name="asset4", uri="test://asset4") logical_date = datetime(2020, 1, 1) - with DAG(dag_id="test", start_date=logical_date, schedule=[asset1]) as dag: + with DAG(dag_id="test", start_date=logical_date, schedule=testing_assets[0:1]) as dag: ExternalTaskSensor( task_id="task1", external_dag_id="external_dag_id", mode="reschedule", ) - BashOperator(task_id="asset_writer", bash_command="echo hello", outlets=[asset2, asset3]) + BashOperator(task_id="asset_writer", bash_command="echo hello", outlets=testing_assets[1:3]) - @dag.task(outlets=[asset4]) + @dag.task(outlets=testing_assets[3:]) def other_asset_writer(x): pass other_asset_writer.expand(x=[1, 2]) + testing_asset_key_strs = [AssetUniqueKey.from_asset(asset).to_str() for asset in testing_assets] + dag = SerializedDAG.to_dict(dag) actual = sorted(dag["dag"]["dag_dependencies"], key=lambda x: tuple(x.values())) expected = sorted( @@ -1799,32 +1821,37 @@ def other_asset_writer(x): { "source": "test", "target": "asset", + "label": "asset4", "dependency_type": "asset", - "dependency_id": "asset4", + "dependency_id": testing_asset_key_strs[3], }, { "source": "external_dag_id", "target": "test", + "label": "task1", "dependency_type": "sensor", "dependency_id": "task1", }, { "source": "test", "target": "asset", + "label": "asset3", "dependency_type": "asset", - "dependency_id": "asset3", + "dependency_id": testing_asset_key_strs[2], }, { "source": "test", "target": "asset", + "label": "asset2", "dependency_type": "asset", - "dependency_id": "asset2", + "dependency_id": testing_asset_key_strs[1], }, { "source": "asset", "target": "test", + "label": "asset1", "dependency_type": "asset", - "dependency_id": "asset1", + "dependency_id": testing_asset_key_strs[0], }, ], key=lambda x: tuple(x.values()), @@ -1869,6 +1896,7 @@ class DerivedOperator(TriggerDagRunOperator): { "source": "test_derived_dag_deps_trigger", "target": "trigger_dag_id", + "label": "task2", "dependency_type": "trigger", "dependency_id": "task2", } diff --git a/tests/utils/test_dot_renderer.py b/tests/utils/test_dot_renderer.py index 9cffec7a8951c..26826f8d50140 100644 --- a/tests/utils/test_dot_renderer.py +++ b/tests/utils/test_dot_renderer.py @@ -51,15 +51,21 @@ def teardown_method(self): def test_should_render_dag_dependencies(self): dag_dep_1 = DagDependency( - source="dag_one", target="dag_two", dependency_type="Sensor", dependency_id="task_1" + source="dag_one", + target="dag_two", + label="task_1", + dependency_type="sensor", + dependency_id="task_1", ) dag_dep_2 = DagDependency( - source="dag_two", target="dag_three", dependency_type="Sensor", dependency_id="task_2" + source="dag_two", + target="dag_three", + label="task_2", + dependency_type="sensor", + dependency_id="task_2", ) - dag_dependency_list = [] - dag_dependency_list.append(dag_dep_1) - dag_dependency_list.append(dag_dep_2) + dag_dependency_list = [dag_dep_1, dag_dep_2] dag_dependency_dict = {} dag_dependency_dict["dag_one"] = dag_dependency_list