diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py index 48b55f2f1e684..75fd131d2a2ff 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py @@ -23,12 +23,133 @@ from __future__ import annotations import enum +import logging from typing import Annotated, Any, Generic, Literal, TypeVar, Union -from pydantic import Discriminator, Field, Tag +from pydantic import BeforeValidator, Discriminator, Field, Tag, TypeAdapter, ValidationError from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel +log = logging.getLogger(__name__) + +# Asset Scheduling Expression Data Models +# +# These mirror the JSON produced by ``BaseAsset.as_expression()`` (see +# ``airflow.serialization.definitions.assets``), which is stored verbatim in +# ``DagModel.asset_expression``. Declaring them gives the REST API -- and the +# TypeScript client generated from its OpenAPI spec -- a real type instead of an +# opaque ``dict``. The shape is a recursive boolean tree whose leaves are assets, +# asset aliases, or asset references. + + +class AssetExpressionAssetInfo(BaseModel): + """ + Body of an ``asset`` leaf node. + + ``id`` is injected by ``DagModelOperation.update_dag_asset_expression`` when the expression is + persisted; ``BaseAsset.as_expression()`` itself only emits ``uri``/``name``/``group``. It is left + optional so a row persisted before id-enrichment (or migrated from the pre-3.0 dataset format) + degrades gracefully instead of failing response validation. + """ + + uri: str + name: str + group: str + id: int | None = None + + +class AssetExpressionAliasInfo(BaseModel): + """Body of an ``alias`` leaf node.""" + + name: str + group: str + + +class AssetExpressionAsset(BaseModel): + """An asset leaf: ``{"asset": {"uri": ..., "name": ..., "group": ...}}``.""" + + asset: AssetExpressionAssetInfo + + +class AssetExpressionAlias(BaseModel): + """An asset alias leaf: ``{"alias": {"name": ..., "group": ...}}``.""" + + alias: AssetExpressionAliasInfo + + +class AssetExpressionRef(BaseModel): + """An unresolved asset reference leaf: ``{"asset_ref": {"name": ...}}`` or ``{"asset_ref": {"uri": ...}}``.""" + + asset_ref: dict[str, str] + + +class AssetExpressionAny(BaseModel): + """An "or" node: ``{"any": [...]}`` -- satisfied when any child is satisfied.""" + + any: list[AssetExpression] + + +class AssetExpressionAll(BaseModel): + """An "and" node: ``{"all": [...]}`` -- satisfied when all children are satisfied.""" + + all: list[AssetExpression] + + +def _asset_expression_discriminator(value: Any) -> str | None: + """Select an expression variant by the single key that is present.""" + keys = ("asset", "alias", "asset_ref", "any", "all") + if isinstance(value, dict): + present = [key for key in keys if key in value] + else: + present = [key for key in keys if getattr(value, key, None) is not None] + return present[0] if len(present) == 1 else None + + +AssetExpression = Annotated[ + Union[ + Annotated[AssetExpressionAsset, Tag("asset")], + Annotated[AssetExpressionAlias, Tag("alias")], + Annotated[AssetExpressionRef, Tag("asset_ref")], + Annotated[AssetExpressionAny, Tag("any")], + Annotated[AssetExpressionAll, Tag("all")], + ], + Discriminator(_asset_expression_discriminator), +] +"""A nested asset scheduling expression; see ``BaseAsset.as_expression()``.""" + +AssetExpressionAny.model_rebuild() +AssetExpressionAll.model_rebuild() + +_asset_expression_adapter: TypeAdapter = TypeAdapter(AssetExpression) + + +def _coerce_unrecognized_expression_to_none(value: Any) -> Any: + """ + Degrade an unrecognized ``asset_expression`` to ``None`` instead of failing response validation. + + ``DagModel.asset_expression`` is stored verbatim from ``BaseAsset.as_expression()`` and is rewritten + to the current shape whenever a Dag is parsed (``DagModelOperation.update_dag_asset_expression``). A + row written by the pre-3.0 dataset scheduler and not yet re-parsed can still hold a legacy shape -- + a bare uri string, ``{"any": ["s3://..."]}``, or ``{"alias": ""}`` -- that the typed model does + not recognise. Serving such a row as ``None`` reproduces the blank render the UI showed while this + field was an untyped ``dict``, rather than turning stored data into an HTTP 500. + """ + if value is None: + return None + try: + _asset_expression_adapter.validate_python(value) + except ValidationError: + log.warning("Dropping unrecognized asset_expression shape to None: %r", value) + return None + return value + + +MaybeAssetExpression = Annotated[ + Union[AssetExpression, None], + BeforeValidator(_coerce_unrecognized_expression_to_none), +] +"""``AssetExpression | None`` that degrades a legacy/unrecognized stored shape to ``None``.""" + # Common Bulk Data Models T = TypeVar("T") K = TypeVar("K") diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py index dea97c4635281..31c9d24fa1659 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py @@ -35,6 +35,7 @@ from airflow._shared.module_loading import qualname from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel, make_partial_model +from airflow.api_fastapi.core_api.datamodels.common import MaybeAssetExpression from airflow.api_fastapi.core_api.datamodels.dag_tags import DagTagResponse from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse from airflow.configuration import conf @@ -191,7 +192,7 @@ class DAGDetailsResponse(DAGResponse): catchup: bool dag_run_timeout: timedelta | None - asset_expression: dict | None + asset_expression: MaybeAssetExpression doc_md: str | None start_date: datetime | None end_date: datetime | None diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/assets.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/assets.py index 446c0659361a9..54847eae467c5 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/assets.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/assets.py @@ -21,6 +21,7 @@ from pydantic import Field from airflow.api_fastapi.core_api.base import BaseModel +from airflow.api_fastapi.core_api.datamodels.common import MaybeAssetExpression class NextRunAssetEventResponse(BaseModel): @@ -49,6 +50,6 @@ class NextRunAssetEventResponse(BaseModel): class NextRunAssetsResponse(BaseModel): """Response for the ``next_run_assets`` endpoint.""" - asset_expression: dict | None = None + asset_expression: MaybeAssetExpression = None events: list[NextRunAssetEventResponse] pending_partition_count: int | None = None diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py index fc2fe3ed2d2aa..febdf45c0a8bb 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py @@ -18,6 +18,7 @@ from __future__ import annotations from airflow.api_fastapi.core_api.base import BaseModel +from airflow.api_fastapi.core_api.datamodels.common import MaybeAssetExpression from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse from airflow.api_fastapi.core_api.datamodels.hitl import HITLDetail from airflow.api_fastapi.core_api.datamodels.ui.dag_runs import DAGRunLightResponse @@ -26,7 +27,7 @@ class DAGWithLatestDagRunsResponse(DAGResponse): """DAG with latest dag runs response serializer.""" - asset_expression: dict | None + asset_expression: MaybeAssetExpression latest_dag_runs: list[DAGRunLightResponse] pending_actions: list[HITLDetail] is_favorite: bool diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/partitioned_dag_runs.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/partitioned_dag_runs.py index b7cbdc705663c..79a284b3f3f01 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/partitioned_dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/partitioned_dag_runs.py @@ -17,6 +17,7 @@ from __future__ import annotations from airflow.api_fastapi.core_api.base import BaseModel +from airflow.api_fastapi.core_api.datamodels.common import MaybeAssetExpression class PartitionedDagRunResponse(BaseModel): @@ -37,7 +38,7 @@ class PartitionedDagRunCollectionResponse(BaseModel): partitioned_dag_runs: list[PartitionedDagRunResponse] total: int - asset_expressions: dict[str, dict | None] | None = None + asset_expressions: dict[str, MaybeAssetExpression] | None = None class PartitionedDagRunAssetResponse(BaseModel): @@ -75,4 +76,4 @@ class PartitionedDagRunDetailResponse(BaseModel): assets: list[PartitionedDagRunAssetResponse] total_required: int total_received: int - asset_expression: dict | None = None + asset_expression: MaybeAssetExpression = None diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 5d51f7b748037..5a62197dcfcfe 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -1736,6 +1736,123 @@ paths: $ref: '#/components/schemas/HTTPValidationError' components: schemas: + AssetExpressionAlias: + properties: + alias: + $ref: '#/components/schemas/AssetExpressionAliasInfo' + type: object + required: + - alias + title: AssetExpressionAlias + description: 'An asset alias leaf: ``{"alias": {"name": ..., "group": ...}}``.' + AssetExpressionAliasInfo: + properties: + name: + type: string + title: Name + group: + type: string + title: Group + type: object + required: + - name + - group + title: AssetExpressionAliasInfo + description: Body of an ``alias`` leaf node. + AssetExpressionAll: + properties: + all: + items: + oneOf: + - $ref: '#/components/schemas/AssetExpressionAsset' + - $ref: '#/components/schemas/AssetExpressionAlias' + - $ref: '#/components/schemas/AssetExpressionRef' + - $ref: '#/components/schemas/AssetExpressionAny' + - $ref: '#/components/schemas/AssetExpressionAll' + type: array + title: All + type: object + required: + - all + title: AssetExpressionAll + description: 'An "and" node: ``{"all": [...]}`` -- satisfied when all children + are satisfied.' + AssetExpressionAny: + properties: + any: + items: + oneOf: + - $ref: '#/components/schemas/AssetExpressionAsset' + - $ref: '#/components/schemas/AssetExpressionAlias' + - $ref: '#/components/schemas/AssetExpressionRef' + - $ref: '#/components/schemas/AssetExpressionAny' + - $ref: '#/components/schemas/AssetExpressionAll' + type: array + title: Any + type: object + required: + - any + title: AssetExpressionAny + description: 'An "or" node: ``{"any": [...]}`` -- satisfied when any child is + satisfied.' + AssetExpressionAsset: + properties: + asset: + $ref: '#/components/schemas/AssetExpressionAssetInfo' + type: object + required: + - asset + title: AssetExpressionAsset + description: 'An asset leaf: ``{"asset": {"uri": ..., "name": ..., "group": + ...}}``.' + AssetExpressionAssetInfo: + properties: + uri: + type: string + title: Uri + name: + type: string + title: Name + group: + type: string + title: Group + id: + anyOf: + - type: integer + - type: 'null' + title: Id + type: object + required: + - uri + - name + - group + title: AssetExpressionAssetInfo + description: 'Body of an ``asset`` leaf node. + + + ``id`` is injected by ``DagModelOperation.update_dag_asset_expression`` when + the expression is + + persisted; ``BaseAsset.as_expression()`` itself only emits ``uri``/``name``/``group``. + It is left + + optional so a row persisted before id-enrichment (or migrated from the pre-3.0 + dataset format) + + degrades gracefully instead of failing response validation.' + AssetExpressionRef: + properties: + asset_ref: + additionalProperties: + type: string + type: object + title: Asset Ref + type: object + required: + - asset_ref + title: AssetExpressionRef + description: 'An unresolved asset reference leaf: ``{"asset_ref": {"name": ...}}`` + or ``{"asset_ref": {"uri": ...}}``.' AuthenticatedMeResponse: properties: id: @@ -2295,8 +2412,12 @@ components: title: Owners asset_expression: anyOf: - - additionalProperties: true - type: object + - oneOf: + - $ref: '#/components/schemas/AssetExpressionAsset' + - $ref: '#/components/schemas/AssetExpressionAlias' + - $ref: '#/components/schemas/AssetExpressionRef' + - $ref: '#/components/schemas/AssetExpressionAny' + - $ref: '#/components/schemas/AssetExpressionAll' - type: 'null' title: Asset Expression latest_dag_runs: @@ -3239,8 +3360,12 @@ components: properties: asset_expression: anyOf: - - additionalProperties: true - type: object + - oneOf: + - $ref: '#/components/schemas/AssetExpressionAsset' + - $ref: '#/components/schemas/AssetExpressionAlias' + - $ref: '#/components/schemas/AssetExpressionRef' + - $ref: '#/components/schemas/AssetExpressionAny' + - $ref: '#/components/schemas/AssetExpressionAll' - type: 'null' title: Asset Expression events: @@ -3400,8 +3525,12 @@ components: anyOf: - additionalProperties: anyOf: - - additionalProperties: true - type: object + - oneOf: + - $ref: '#/components/schemas/AssetExpressionAsset' + - $ref: '#/components/schemas/AssetExpressionAlias' + - $ref: '#/components/schemas/AssetExpressionRef' + - $ref: '#/components/schemas/AssetExpressionAny' + - $ref: '#/components/schemas/AssetExpressionAll' - type: 'null' type: object - type: 'null' @@ -3451,8 +3580,12 @@ components: title: Total Received asset_expression: anyOf: - - additionalProperties: true - type: object + - oneOf: + - $ref: '#/components/schemas/AssetExpressionAsset' + - $ref: '#/components/schemas/AssetExpressionAlias' + - $ref: '#/components/schemas/AssetExpressionRef' + - $ref: '#/components/schemas/AssetExpressionAny' + - $ref: '#/components/schemas/AssetExpressionAll' - type: 'null' title: Asset Expression type: object diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index ea6f0a70fcd0a..44f797e1d3c36 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -11624,6 +11624,123 @@ components: - timestamp title: AssetEventResponse description: Asset event serializer for responses. + AssetExpressionAlias: + properties: + alias: + $ref: '#/components/schemas/AssetExpressionAliasInfo' + type: object + required: + - alias + title: AssetExpressionAlias + description: 'An asset alias leaf: ``{"alias": {"name": ..., "group": ...}}``.' + AssetExpressionAliasInfo: + properties: + name: + type: string + title: Name + group: + type: string + title: Group + type: object + required: + - name + - group + title: AssetExpressionAliasInfo + description: Body of an ``alias`` leaf node. + AssetExpressionAll: + properties: + all: + items: + oneOf: + - $ref: '#/components/schemas/AssetExpressionAsset' + - $ref: '#/components/schemas/AssetExpressionAlias' + - $ref: '#/components/schemas/AssetExpressionRef' + - $ref: '#/components/schemas/AssetExpressionAny' + - $ref: '#/components/schemas/AssetExpressionAll' + type: array + title: All + type: object + required: + - all + title: AssetExpressionAll + description: 'An "and" node: ``{"all": [...]}`` -- satisfied when all children + are satisfied.' + AssetExpressionAny: + properties: + any: + items: + oneOf: + - $ref: '#/components/schemas/AssetExpressionAsset' + - $ref: '#/components/schemas/AssetExpressionAlias' + - $ref: '#/components/schemas/AssetExpressionRef' + - $ref: '#/components/schemas/AssetExpressionAny' + - $ref: '#/components/schemas/AssetExpressionAll' + type: array + title: Any + type: object + required: + - any + title: AssetExpressionAny + description: 'An "or" node: ``{"any": [...]}`` -- satisfied when any child is + satisfied.' + AssetExpressionAsset: + properties: + asset: + $ref: '#/components/schemas/AssetExpressionAssetInfo' + type: object + required: + - asset + title: AssetExpressionAsset + description: 'An asset leaf: ``{"asset": {"uri": ..., "name": ..., "group": + ...}}``.' + AssetExpressionAssetInfo: + properties: + uri: + type: string + title: Uri + name: + type: string + title: Name + group: + type: string + title: Group + id: + anyOf: + - type: integer + - type: 'null' + title: Id + type: object + required: + - uri + - name + - group + title: AssetExpressionAssetInfo + description: 'Body of an ``asset`` leaf node. + + + ``id`` is injected by ``DagModelOperation.update_dag_asset_expression`` when + the expression is + + persisted; ``BaseAsset.as_expression()`` itself only emits ``uri``/``name``/``group``. + It is left + + optional so a row persisted before id-enrichment (or migrated from the pre-3.0 + dataset format) + + degrades gracefully instead of failing response validation.' + AssetExpressionRef: + properties: + asset_ref: + additionalProperties: + type: string + type: object + title: Asset Ref + type: object + required: + - asset_ref + title: AssetExpressionRef + description: 'An unresolved asset reference leaf: ``{"asset_ref": {"name": ...}}`` + or ``{"asset_ref": {"uri": ...}}``.' AssetResponse: properties: id: @@ -13318,8 +13435,12 @@ components: title: Dag Run Timeout asset_expression: anyOf: - - additionalProperties: true - type: object + - oneOf: + - $ref: '#/components/schemas/AssetExpressionAsset' + - $ref: '#/components/schemas/AssetExpressionAlias' + - $ref: '#/components/schemas/AssetExpressionRef' + - $ref: '#/components/schemas/AssetExpressionAny' + - $ref: '#/components/schemas/AssetExpressionAll' - type: 'null' title: Asset Expression doc_md: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py index 4c0e34fd297c1..d3da95178a1db 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast import structlog from fastapi import Depends, HTTPException, status @@ -147,7 +147,8 @@ def next_run_assets( ) for row in raw_rows ] - return NextRunAssetsResponse(asset_expression=dag_model.asset_expression, events=events) + model_data: dict[str, Any] = {"asset_expression": dag_model.asset_expression, "events": events} + return NextRunAssetsResponse.model_validate(model_data) # Partitioned Dags: enrich with per-asset received/required counts and rollup flag. # FIFO matches the scheduler's pending-APDR processing order @@ -180,11 +181,12 @@ def next_run_assets( ) for row in raw_rows ] - return NextRunAssetsResponse( - asset_expression=dag_model.asset_expression, - events=events, - pending_partition_count=pending_partition_count, - ) + model_data = { + "asset_expression": dag_model.asset_expression, + "events": events, + "pending_partition_count": pending_partition_count, + } + return NextRunAssetsResponse.model_validate(model_data) # Collect received upstream partition keys per asset for this partition run. # Use a set to deduplicate: multiple events for the same key count as one. @@ -253,8 +255,9 @@ def next_run_assets( ) ) - return NextRunAssetsResponse( - asset_expression=dag_model.asset_expression, - events=events, - pending_partition_count=pending_partition_count, - ) + model_data = { + "asset_expression": dag_model.asset_expression, + "events": events, + "pending_partition_count": pending_partition_count, + } + return NextRunAssetsResponse.model_validate(model_data) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py index e7f949e92cf73..29b393070e35b 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, NamedTuple, TypeAlias, cast +from typing import TYPE_CHECKING, Any, NamedTuple, TypeAlias, cast import structlog from fastapi import Depends, HTTPException, status @@ -334,11 +334,12 @@ def get_partitioned_dag_runs( if dag_id.value is None: asset_expressions = {dm.dag_id: dm.asset_expression for dm in dag_models.values()} - return PartitionedDagRunCollectionResponse( - partitioned_dag_runs=results, - total=len(results), - asset_expressions=asset_expressions, - ) + model_data: dict[str, Any] = { + "partitioned_dag_runs": results, + "total": len(results), + "asset_expressions": asset_expressions, + } + return PartitionedDagRunCollectionResponse.model_validate(model_data) @partitioned_dag_runs_router.get( @@ -463,15 +464,16 @@ def get_pending_partitioned_dag_run( total_required = sum(a.required_count for a in assets) asset_expression = dag_model.asset_expression if dag_model is not None else None - return PartitionedDagRunDetailResponse( - id=partitioned_dag_run.id, - dag_id=dag_id, - partition_key=partition_key, - created_at=partitioned_dag_run.created_at.isoformat() if partitioned_dag_run.created_at else None, - updated_at=partitioned_dag_run.updated_at.isoformat() if partitioned_dag_run.updated_at else None, - created_dag_run_id=partitioned_dag_run.created_dag_run_id, - assets=assets, - total_required=total_required, - total_received=total_received, - asset_expression=asset_expression, - ) + model_data: dict[str, Any] = { + "id": partitioned_dag_run.id, + "dag_id": dag_id, + "partition_key": partition_key, + "created_at": partitioned_dag_run.created_at.isoformat() if partitioned_dag_run.created_at else None, + "updated_at": partitioned_dag_run.updated_at.isoformat() if partitioned_dag_run.updated_at else None, + "created_dag_run_id": partitioned_dag_run.created_dag_run_id, + "assets": assets, + "total_required": total_required, + "total_received": total_received, + "asset_expression": asset_expression, + } + return PartitionedDagRunDetailResponse.model_validate(model_data) diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 0f98b36e8c790..ebf27dbbb84ea 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -315,6 +315,164 @@ export const $AssetEventResponse = { description: 'Asset event serializer for responses.' } as const; +export const $AssetExpressionAlias = { + properties: { + alias: { + '$ref': '#/components/schemas/AssetExpressionAliasInfo' + } + }, + type: 'object', + required: ['alias'], + title: 'AssetExpressionAlias', + description: 'An asset alias leaf: ``{"alias": {"name": ..., "group": ...}}``.' +} as const; + +export const $AssetExpressionAliasInfo = { + properties: { + name: { + type: 'string', + title: 'Name' + }, + group: { + type: 'string', + title: 'Group' + } + }, + type: 'object', + required: ['name', 'group'], + title: 'AssetExpressionAliasInfo', + description: 'Body of an ``alias`` leaf node.' +} as const; + +export const $AssetExpressionAll = { + properties: { + all: { + items: { + oneOf: [ + { + '$ref': '#/components/schemas/AssetExpressionAsset' + }, + { + '$ref': '#/components/schemas/AssetExpressionAlias' + }, + { + '$ref': '#/components/schemas/AssetExpressionRef' + }, + { + '$ref': '#/components/schemas/AssetExpressionAny' + }, + { + '$ref': '#/components/schemas/AssetExpressionAll' + } + ] + }, + type: 'array', + title: 'All' + } + }, + type: 'object', + required: ['all'], + title: 'AssetExpressionAll', + description: 'An "and" node: ``{"all": [...]}`` -- satisfied when all children are satisfied.' +} as const; + +export const $AssetExpressionAny = { + properties: { + any: { + items: { + oneOf: [ + { + '$ref': '#/components/schemas/AssetExpressionAsset' + }, + { + '$ref': '#/components/schemas/AssetExpressionAlias' + }, + { + '$ref': '#/components/schemas/AssetExpressionRef' + }, + { + '$ref': '#/components/schemas/AssetExpressionAny' + }, + { + '$ref': '#/components/schemas/AssetExpressionAll' + } + ] + }, + type: 'array', + title: 'Any' + } + }, + type: 'object', + required: ['any'], + title: 'AssetExpressionAny', + description: 'An "or" node: ``{"any": [...]}`` -- satisfied when any child is satisfied.' +} as const; + +export const $AssetExpressionAsset = { + properties: { + asset: { + '$ref': '#/components/schemas/AssetExpressionAssetInfo' + } + }, + type: 'object', + required: ['asset'], + title: 'AssetExpressionAsset', + description: 'An asset leaf: ``{"asset": {"uri": ..., "name": ..., "group": ...}}``.' +} as const; + +export const $AssetExpressionAssetInfo = { + properties: { + uri: { + type: 'string', + title: 'Uri' + }, + name: { + type: 'string', + title: 'Name' + }, + group: { + type: 'string', + title: 'Group' + }, + id: { + anyOf: [ + { + type: 'integer' + }, + { + type: 'null' + } + ], + title: 'Id' + } + }, + type: 'object', + required: ['uri', 'name', 'group'], + title: 'AssetExpressionAssetInfo', + description: `Body of an \`\`asset\`\` leaf node. + +\`\`id\`\` is injected by \`\`DagModelOperation.update_dag_asset_expression\`\` when the expression is +persisted; \`\`BaseAsset.as_expression()\`\` itself only emits \`\`uri\`\`/\`\`name\`\`/\`\`group\`\`. It is left +optional so a row persisted before id-enrichment (or migrated from the pre-3.0 dataset format) +degrades gracefully instead of failing response validation.` +} as const; + +export const $AssetExpressionRef = { + properties: { + asset_ref: { + additionalProperties: { + type: 'string' + }, + type: 'object', + title: 'Asset Ref' + } + }, + type: 'object', + required: ['asset_ref'], + title: 'AssetExpressionRef', + description: 'An unresolved asset reference leaf: ``{"asset_ref": {"name": ...}}`` or ``{"asset_ref": {"uri": ...}}``.' +} as const; + export const $AssetResponse = { properties: { id: { @@ -2808,8 +2966,23 @@ export const $DAGDetailsResponse = { asset_expression: { anyOf: [ { - additionalProperties: true, - type: 'object' + oneOf: [ + { + '$ref': '#/components/schemas/AssetExpressionAsset' + }, + { + '$ref': '#/components/schemas/AssetExpressionAlias' + }, + { + '$ref': '#/components/schemas/AssetExpressionRef' + }, + { + '$ref': '#/components/schemas/AssetExpressionAny' + }, + { + '$ref': '#/components/schemas/AssetExpressionAll' + } + ] }, { type: 'null' @@ -8993,8 +9166,23 @@ export const $DAGWithLatestDagRunsResponse = { asset_expression: { anyOf: [ { - additionalProperties: true, - type: 'object' + oneOf: [ + { + '$ref': '#/components/schemas/AssetExpressionAsset' + }, + { + '$ref': '#/components/schemas/AssetExpressionAlias' + }, + { + '$ref': '#/components/schemas/AssetExpressionRef' + }, + { + '$ref': '#/components/schemas/AssetExpressionAny' + }, + { + '$ref': '#/components/schemas/AssetExpressionAll' + } + ] }, { type: 'null' @@ -9864,8 +10052,23 @@ export const $NextRunAssetsResponse = { asset_expression: { anyOf: [ { - additionalProperties: true, - type: 'object' + oneOf: [ + { + '$ref': '#/components/schemas/AssetExpressionAsset' + }, + { + '$ref': '#/components/schemas/AssetExpressionAlias' + }, + { + '$ref': '#/components/schemas/AssetExpressionRef' + }, + { + '$ref': '#/components/schemas/AssetExpressionAny' + }, + { + '$ref': '#/components/schemas/AssetExpressionAll' + } + ] }, { type: 'null' @@ -10087,8 +10290,23 @@ export const $PartitionedDagRunCollectionResponse = { additionalProperties: { anyOf: [ { - additionalProperties: true, - type: 'object' + oneOf: [ + { + '$ref': '#/components/schemas/AssetExpressionAsset' + }, + { + '$ref': '#/components/schemas/AssetExpressionAlias' + }, + { + '$ref': '#/components/schemas/AssetExpressionRef' + }, + { + '$ref': '#/components/schemas/AssetExpressionAny' + }, + { + '$ref': '#/components/schemas/AssetExpressionAll' + } + ] }, { type: 'null' @@ -10175,8 +10393,23 @@ export const $PartitionedDagRunDetailResponse = { asset_expression: { anyOf: [ { - additionalProperties: true, - type: 'object' + oneOf: [ + { + '$ref': '#/components/schemas/AssetExpressionAsset' + }, + { + '$ref': '#/components/schemas/AssetExpressionAlias' + }, + { + '$ref': '#/components/schemas/AssetExpressionRef' + }, + { + '$ref': '#/components/schemas/AssetExpressionAny' + }, + { + '$ref': '#/components/schemas/AssetExpressionAll' + } + ] }, { type: 'null' diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 81b5665a333d0..83e95991808a9 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -83,6 +83,66 @@ export type AssetEventResponse = { partition_key?: string | null; }; +/** + * An asset alias leaf: ``{"alias": {"name": ..., "group": ...}}``. + */ +export type AssetExpressionAlias = { + alias: AssetExpressionAliasInfo; +}; + +/** + * Body of an ``alias`` leaf node. + */ +export type AssetExpressionAliasInfo = { + name: string; + group: string; +}; + +/** + * An "and" node: ``{"all": [...]}`` -- satisfied when all children are satisfied. + */ +export type AssetExpressionAll = { + all: Array<(AssetExpressionAsset | AssetExpressionAlias | AssetExpressionRef | AssetExpressionAny | AssetExpressionAll)>; +}; + +/** + * An "or" node: ``{"any": [...]}`` -- satisfied when any child is satisfied. + */ +export type AssetExpressionAny = { + any: Array<(AssetExpressionAsset | AssetExpressionAlias | AssetExpressionRef | AssetExpressionAny | AssetExpressionAll)>; +}; + +/** + * An asset leaf: ``{"asset": {"uri": ..., "name": ..., "group": ...}}``. + */ +export type AssetExpressionAsset = { + asset: AssetExpressionAssetInfo; +}; + +/** + * Body of an ``asset`` leaf node. + * + * ``id`` is injected by ``DagModelOperation.update_dag_asset_expression`` when the expression is + * persisted; ``BaseAsset.as_expression()`` itself only emits ``uri``/``name``/``group``. It is left + * optional so a row persisted before id-enrichment (or migrated from the pre-3.0 dataset format) + * degrades gracefully instead of failing response validation. + */ +export type AssetExpressionAssetInfo = { + uri: string; + name: string; + group: string; + id?: number | null; +}; + +/** + * An unresolved asset reference leaf: ``{"asset_ref": {"name": ...}}`` or ``{"asset_ref": {"uri": ...}}``. + */ +export type AssetExpressionRef = { + asset_ref: { + [key: string]: (string); + }; +}; + /** * Asset serializer for responses. */ @@ -797,9 +857,7 @@ export type DAGDetailsResponse = { owners: Array<(string)>; catchup: boolean; dag_run_timeout: string | null; - asset_expression: { - [key: string]: unknown; -} | null; + asset_expression: AssetExpressionAsset | AssetExpressionAlias | AssetExpressionRef | AssetExpressionAny | AssetExpressionAll | null; doc_md: string | null; start_date: string | null; end_date: string | null; @@ -2263,9 +2321,7 @@ export type DAGWithLatestDagRunsResponse = { next_dagrun_run_after: string | null; allowed_run_types: Array | null; owners: Array<(string)>; - asset_expression: { - [key: string]: unknown; -} | null; + asset_expression: AssetExpressionAsset | AssetExpressionAlias | AssetExpressionRef | AssetExpressionAny | AssetExpressionAll | null; latest_dag_runs: Array; pending_actions: Array; is_favorite: boolean; @@ -2509,9 +2565,7 @@ export type NextRunAssetEventResponse = { * Response for the ``next_run_assets`` endpoint. */ export type NextRunAssetsResponse = { - asset_expression?: { - [key: string]: unknown; -} | null; + asset_expression?: AssetExpressionAsset | AssetExpressionAlias | AssetExpressionRef | AssetExpressionAny | AssetExpressionAll | null; events: Array; pending_partition_count?: number | null; }; @@ -2558,9 +2612,7 @@ export type PartitionedDagRunCollectionResponse = { partitioned_dag_runs: Array; total: number; asset_expressions?: { - [key: string]: ({ - [key: string]: unknown; -} | null); + [key: string]: (AssetExpressionAsset | AssetExpressionAlias | AssetExpressionRef | AssetExpressionAny | AssetExpressionAll | null); } | null; }; @@ -2577,9 +2629,7 @@ export type PartitionedDagRunDetailResponse = { assets: Array; total_required: number; total_received: number; - asset_expression?: { - [key: string]: unknown; -} | null; + asset_expression?: AssetExpressionAsset | AssetExpressionAlias | AssetExpressionRef | AssetExpressionAny | AssetExpressionAll | null; }; /** diff --git a/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx b/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx index c8c0382fd7326..3d5e1f2c37a66 100644 --- a/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx +++ b/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx @@ -53,7 +53,7 @@ export const AssetExpression = ({ <> {"any" in expression ? ( - {expression.any?.map((item, index) => ( + {expression.any.map((item, index) => ( // eslint-disable-next-line react/no-array-index-key {"asset" in item || "alias" in item ? ( @@ -64,7 +64,7 @@ export const AssetExpression = ({ ) : ( )} - {expression.any && index === expression.any.length - 1 ? undefined : ( + {index === expression.any.length - 1 ? undefined : ( {translate("expression.or")} @@ -76,7 +76,7 @@ export const AssetExpression = ({ ) : undefined} {"all" in expression ? ( - {expression.all?.map((item, index) => ( + {expression.all.map((item, index) => ( // eslint-disable-next-line react/no-array-index-key {"asset" in item || "alias" in item ? ( diff --git a/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetNode.tsx b/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetNode.tsx index 1a0d0b289a95a..7d2b0c9c5ae81 100644 --- a/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetNode.tsx +++ b/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetNode.tsx @@ -20,18 +20,21 @@ import { Box, HStack, Text } from "@chakra-ui/react"; import { FiDatabase } from "react-icons/fi"; import { PiRectangleDashed } from "react-icons/pi"; -import type { NextRunAssetEventResponse } from "openapi/requests/types.gen"; +import type { + AssetExpressionAlias, + AssetExpressionAsset, + NextRunAssetEventResponse, +} from "openapi/requests/types.gen"; import { RollupKeyChecklistPopover } from "src/components/RollupKeyChecklist"; import { RouterLink } from "src/components/ui"; import Time from "../Time"; -import type { AssetSummary } from "./types"; export const AssetNode = ({ asset, event, }: { - readonly asset: AssetSummary; + readonly asset: AssetExpressionAlias | AssetExpressionAsset; readonly event?: NextRunAssetEventResponse; }) => { const isFullyReceived = Boolean(event?.last_update); diff --git a/airflow-core/src/airflow/ui/src/components/AssetExpression/index.ts b/airflow-core/src/airflow/ui/src/components/AssetExpression/index.ts index c54f53ab6c515..5414b934cd2ea 100644 --- a/airflow-core/src/airflow/ui/src/components/AssetExpression/index.ts +++ b/airflow-core/src/airflow/ui/src/components/AssetExpression/index.ts @@ -18,4 +18,4 @@ */ export { AssetExpression } from "./AssetExpression"; -export type { ExpressionType, AssetSummary } from "./types"; +export type { ExpressionType } from "./types"; diff --git a/airflow-core/src/airflow/ui/src/components/AssetExpression/types.ts b/airflow-core/src/airflow/ui/src/components/AssetExpression/types.ts index cae9bfa524f04..486ebc5b49761 100644 --- a/airflow-core/src/airflow/ui/src/components/AssetExpression/types.ts +++ b/airflow-core/src/airflow/ui/src/components/AssetExpression/types.ts @@ -16,29 +16,6 @@ * specific language governing permissions and limitations * under the License. */ +import type { DAGDetailsResponse } from "openapi/requests/types.gen"; -type Asset = { - asset: { - group: string; - id: number; - name: string; - uri: string; - }; -}; - -type Alias = { - alias: { - group: string; - name: string; - }; -}; - -export type AssetSummary = Alias | Asset; - -export type ExpressionType = - | Alias - | Asset - | { - all?: Array; - any?: Array; - }; +export type ExpressionType = NonNullable; diff --git a/airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx b/airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx index 44ea0d76ee504..fe662437bf3a1 100644 --- a/airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx +++ b/airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx @@ -21,7 +21,7 @@ import { FiDatabase } from "react-icons/fi"; import { usePartitionedDagRunServiceGetPendingPartitionedDagRun } from "openapi/queries"; import type { NextRunAssetEventResponse, PartitionedDagRunAssetResponse } from "openapi/requests/types.gen"; -import { AssetExpression, type ExpressionType } from "src/components/AssetExpression"; +import { AssetExpression } from "src/components/AssetExpression"; import { RollupKeyChecklist } from "src/components/RollupKeyChecklist"; import { Popover, RouterLink } from "src/components/ui"; @@ -35,7 +35,7 @@ type Props = { export const AssetProgressCell = ({ dagId, partitionKey, totalReceived, totalRequired }: Props) => { const { data, isLoading } = usePartitionedDagRunServiceGetPendingPartitionedDagRun({ dagId, partitionKey }); - const assetExpression = data?.asset_expression as ExpressionType | undefined; + const assetExpression = data?.asset_expression ?? undefined; const assets: Array = data?.assets ?? []; const hasRollup = assets.some((ak) => ak.is_rollup); diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx index 5f252cbca5811..3037cce997575 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx @@ -259,7 +259,7 @@ export const AssetSchedule = ({ assetExpression, dagId, timetablePartitioned, ti diff --git a/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_common.py b/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_common.py new file mode 100644 index 0000000000000..2bf6fddb12289 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_common.py @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest +from pydantic import TypeAdapter, ValidationError + +from airflow.api_fastapi.core_api.datamodels.common import AssetExpression, MaybeAssetExpression + +# A single adapter is enough to validate/serialize the discriminated union. +_adapter: TypeAdapter[AssetExpression] = TypeAdapter(AssetExpression) +# The field type as actually wired onto the response models: tolerant of legacy shapes. +_field_adapter: TypeAdapter[MaybeAssetExpression] = TypeAdapter(MaybeAssetExpression) + +# Leaf nodes as actually stored in ``DagModel.asset_expression``: ``asset`` leaves are +# enriched with the resolved ``AssetModel.id`` (see +# ``DagModelOperation.update_dag_asset_expression``), while ``alias`` and ``asset_ref`` +# leaves are left as ``BaseAsset.as_expression()`` produced them. +_ASSET = {"asset": {"uri": "s3://bucket/key", "name": "my_asset", "group": "asset", "id": 7}} +_ALIAS = {"alias": {"name": "my_alias", "group": "asset"}} +_REF_BY_NAME = {"asset_ref": {"name": "by_name"}} +_REF_BY_URI = {"asset_ref": {"uri": "s3://bucket/key"}} + + +@pytest.mark.parametrize( + "expression", + [ + pytest.param(_ASSET, id="asset"), + pytest.param(_ALIAS, id="alias"), + pytest.param(_REF_BY_NAME, id="asset_ref_by_name"), + pytest.param(_REF_BY_URI, id="asset_ref_by_uri"), + pytest.param({"any": [_ASSET, _ALIAS]}, id="any"), + pytest.param({"all": [_ASSET, _REF_BY_NAME]}, id="all"), + pytest.param({"all": [{"any": [_ASSET]}, _ASSET]}, id="nested"), + ], +) +def test_asset_expression_round_trips_unchanged(expression: dict): + """The typed model must accept and re-serialize each stored expression byte-identically.""" + validated = _adapter.validate_python(expression) + assert _adapter.dump_python(validated, by_alias=True) == expression + + +def test_asset_expression_tolerates_legacy_asset_leaf_without_id(): + """ + ``asset`` leaves written by the current scheduler always carry ``id``, but a leaf persisted + before id-enrichment may not. Such a leaf must still validate -- with ``id`` defaulting to + ``None`` -- so a not-yet-enriched row is served instead of returning a 500. (Genuinely legacy + pre-3.0 dataset shapes are a separate concern, covered below via ``MaybeAssetExpression``.) + """ + validated = _adapter.validate_python({"asset": {"uri": "s3://b", "name": "n", "group": "asset"}}) + assert validated.asset.id is None + + +@pytest.mark.parametrize( + "invalid", + [ + pytest.param({}, id="empty"), + pytest.param({"unknown": {}}, id="unknown_key"), + pytest.param({"asset": _ASSET["asset"], "alias": _ALIAS["alias"]}, id="ambiguous_two_keys"), + pytest.param({"asset": {"name": "a", "id": 1}}, id="asset_missing_fields"), + ], +) +def test_asset_expression_rejects_invalid_shapes(invalid: dict): + with pytest.raises(ValidationError): + _adapter.validate_python(invalid) + + +# Shapes a released Airflow 2.x stored in ``dataset_expression`` and that the 3.0 column rename +# (``0041_rename_dataset_as_asset``) carried over verbatim into ``asset_expression``: dataset leaves +# serialized as a bare uri string, aliases as ``{"alias": ""}``, composites over those strings. +_LEGACY_SHAPES = [ + pytest.param("s3://bucket/key", id="bare_uri_string"), + pytest.param({"any": ["s3://a", "s3://b"]}, id="any_of_uri_strings"), + pytest.param({"all": ["s3://a", "s3://b"]}, id="all_of_uri_strings"), + pytest.param({"alias": "my_alias"}, id="alias_as_string"), +] + + +@pytest.mark.parametrize("legacy", _LEGACY_SHAPES) +def test_field_coerces_legacy_pre_3_0_shapes_to_none(legacy): + """ + A row written by the pre-3.0 dataset scheduler and not yet re-parsed still holds a legacy shape + the typed model cannot describe. The field must serve it as ``None`` -- the blank render the UI + showed while this field was an untyped ``dict`` -- rather than 500 the whole endpoint. + """ + assert _field_adapter.validate_python(legacy) is None + + +@pytest.mark.parametrize( + "expression", + [ + pytest.param(None, id="none"), + pytest.param(_ASSET, id="asset"), + pytest.param({"any": [_ASSET, _ALIAS]}, id="any"), + ], +) +def test_field_preserves_current_shapes(expression): + """Tolerance must not swallow valid current expressions: they pass through unchanged.""" + validated = _field_adapter.validate_python(expression) + if expression is None: + assert validated is None + else: + assert _field_adapter.dump_python(validated, by_alias=True) == expression diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py index f3a3eef61c0ac..fd01b167263de 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py @@ -137,7 +137,17 @@ def _create_asset_test_data(self, session=None): is_stale=False, is_paused=False, owners="airflow", - asset_expression={"any": [{"uri": "test://scheduled_asset"}]}, + asset_expression={ + "any": [ + { + "asset": { + "uri": "test://scheduled_asset", + "name": "scheduled_asset", + "group": "test-group", + } + } + ] + }, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, @@ -156,7 +166,9 @@ def _create_asset_test_data(self, session=None): is_stale=False, is_paused=False, owners="airflow", - asset_expression={"any": [{"uri": "test://asset1"}]}, + asset_expression={ + "any": [{"asset": {"uri": "test://asset1", "name": "test_asset_1", "group": "test-group"}}] + }, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, @@ -174,7 +186,11 @@ def _create_asset_test_data(self, session=None): is_stale=False, is_paused=False, owners="airflow", - asset_expression={"any": [{"uri": "s3://bucket/dataset"}]}, + asset_expression={ + "any": [ + {"asset": {"uri": "s3://bucket/dataset", "name": "dataset_asset", "group": "test-group"}} + ] + }, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, @@ -1203,6 +1219,19 @@ def test_dag_details_includes_is_favorite_field(self, session, test_client): assert isinstance(body["is_favorite"], bool) assert body["is_favorite"] is False + def test_dag_details_serves_legacy_asset_expression_as_null(self, session, test_client): + """A pre-3.0 dataset-format ``asset_expression`` that the typed model cannot describe is + served as ``null`` instead of 500ing the endpoint (handled by ``MaybeAssetExpression``).""" + dag_model = session.get(DagModel, DAG2_ID) + # The 2.x dataset scheduler stored bare-string leaves; the 3.0 column rename kept them verbatim. + dag_model.asset_expression = {"any": ["s3://legacy-a", "s3://legacy-b"]} + session.commit() + + response = test_client.get(f"/dags/{DAG2_ID}/details") + + assert response.status_code == 200 + assert response.json()["asset_expression"] is None + def test_dag_details_includes_active_runs_count(self, session, test_client): """Test that DAG details include the active_runs_count field.""" # Create running and queued DAG runs for DAG2 diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index bd7ce6dd44859..7fbe5a86d2e0b 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -61,6 +61,39 @@ class AssetEventAccessControl(BaseModel): allow_global: Annotated[bool | None, Field(title="Allow Global")] = True +class AssetExpressionAliasInfo(BaseModel): + """ + Body of an ``alias`` leaf node. + """ + + name: Annotated[str, Field(title="Name")] + group: Annotated[str, Field(title="Group")] + + +class AssetExpressionAssetInfo(BaseModel): + """ + Body of an ``asset`` leaf node. + + ``id`` is injected by ``DagModelOperation.update_dag_asset_expression`` when the expression is + persisted; ``BaseAsset.as_expression()`` itself only emits ``uri``/``name``/``group``. It is left + optional so a row persisted before id-enrichment (or migrated from the pre-3.0 dataset format) + degrades gracefully instead of failing response validation. + """ + + uri: Annotated[str, Field(title="Uri")] + name: Annotated[str, Field(title="Name")] + group: Annotated[str, Field(title="Group")] + id: Annotated[int | None, Field(title="Id")] = None + + +class AssetExpressionRef(BaseModel): + """ + An unresolved asset reference leaf: ``{"asset_ref": {"name": ...}}`` or ``{"asset_ref": {"uri": ...}}``. + """ + + asset_ref: Annotated[dict[str, str], Field(title="Asset Ref")] + + class AssetStateStoreWriterKind(str, Enum): """ Identifies what kind of writer last updated an asset state store entry. @@ -1305,6 +1338,22 @@ class AssetEventResponse(BaseModel): partition_key: Annotated[str | None, Field(title="Partition Key")] = None +class AssetExpressionAlias(BaseModel): + """ + An asset alias leaf: ``{"alias": {"name": ..., "group": ...}}``. + """ + + alias: AssetExpressionAliasInfo + + +class AssetExpressionAsset(BaseModel): + """ + An asset leaf: ``{"asset": {"uri": ..., "name": ..., "group": ...}}``. + """ + + asset: AssetExpressionAssetInfo + + class AssetResponse(BaseModel): """ Asset serializer for responses. @@ -1700,76 +1749,6 @@ class ConnectionCollectionResponse(BaseModel): total_entries: Annotated[int, Field(title="Total Entries")] -class DAGDetailsResponse(BaseModel): - """ - Specific serializer for Dag Details responses. - """ - - dag_id: Annotated[str, Field(title="Dag Id")] - dag_display_name: Annotated[str, Field(title="Dag Display Name")] - is_paused: Annotated[bool, Field(title="Is Paused")] - is_stale: Annotated[bool, Field(title="Is Stale")] - last_parsed_time: Annotated[datetime | None, Field(title="Last Parsed Time")] = None - last_parse_duration: Annotated[float | None, Field(title="Last Parse Duration")] = None - last_expired: Annotated[datetime | None, Field(title="Last Expired")] = None - bundle_name: Annotated[str | None, Field(title="Bundle Name")] = None - bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None - relative_fileloc: Annotated[str | None, Field(title="Relative Fileloc")] = None - fileloc: Annotated[str, Field(title="Fileloc")] - description: Annotated[str | None, Field(title="Description")] = None - timetable_summary: Annotated[str | None, Field(title="Timetable Summary")] = None - timetable_description: Annotated[str | None, Field(title="Timetable Description")] = None - timetable_partitioned: Annotated[bool, Field(title="Timetable Partitioned")] - timetable_periodic: Annotated[bool, Field(title="Timetable Periodic")] - tags: Annotated[list[DagTagResponse], Field(title="Tags")] - max_active_tasks: Annotated[int, Field(title="Max Active Tasks")] - max_active_runs: Annotated[int | None, Field(title="Max Active Runs")] = None - max_consecutive_failed_dag_runs: Annotated[int, Field(title="Max Consecutive Failed Dag Runs")] - has_task_concurrency_limits: Annotated[bool, Field(title="Has Task Concurrency Limits")] - has_import_errors: Annotated[bool, Field(title="Has Import Errors")] - next_dagrun_logical_date: Annotated[datetime | None, Field(title="Next Dagrun Logical Date")] = None - next_dagrun_data_interval_start: Annotated[ - datetime | None, Field(title="Next Dagrun Data Interval Start") - ] = None - next_dagrun_data_interval_end: Annotated[ - datetime | None, Field(title="Next Dagrun Data Interval End") - ] = None - next_dagrun_run_after: Annotated[datetime | None, Field(title="Next Dagrun Run After")] = None - allowed_run_types: Annotated[list[DagRunType] | None, Field(title="Allowed Run Types")] = None - owners: Annotated[list[str], Field(title="Owners")] - catchup: Annotated[bool, Field(title="Catchup")] - dag_run_timeout: Annotated[timedelta | None, Field(title="Dag Run Timeout")] = None - asset_expression: Annotated[dict[str, Any] | None, Field(title="Asset Expression")] = None - doc_md: Annotated[str | None, Field(title="Doc Md")] = None - start_date: Annotated[datetime | None, Field(title="Start Date")] = None - end_date: Annotated[datetime | None, Field(title="End Date")] = None - is_paused_upon_creation: Annotated[bool | None, Field(title="Is Paused Upon Creation")] = None - params: Annotated[dict[str, Any] | None, Field(title="Params")] = None - render_template_as_native_obj: Annotated[bool, Field(title="Render Template As Native Obj")] - template_search_path: Annotated[list[str] | None, Field(title="Template Search Path")] = None - timezone: Annotated[str | None, Field(title="Timezone")] = None - last_parsed: Annotated[datetime | None, Field(title="Last Parsed")] = None - default_args: Annotated[dict[str, Any] | None, Field(title="Default Args")] = None - rerun_with_latest_version: Annotated[bool | None, Field(title="Rerun With Latest Version")] = None - owner_links: Annotated[dict[str, str] | None, Field(title="Owner Links")] = None - is_favorite: Annotated[bool | None, Field(title="Is Favorite")] = False - active_runs_count: Annotated[int | None, Field(title="Active Runs Count")] = 0 - is_backfillable: Annotated[ - bool, Field(description="Whether this Dag's schedule supports backfilling.", title="Is Backfillable") - ] - file_token: Annotated[str, Field(description="Return file token.", title="File Token")] - concurrency: Annotated[ - int, - Field( - description="Return max_active_tasks as concurrency.\n\nDeprecated: Use max_active_tasks instead.", - title="Concurrency", - ), - ] - latest_dag_version: Annotated[ - DagVersionResponse | None, Field(description="Return the latest DagVersion.") - ] = None - - class DAGResponse(BaseModel): """ Dag serializer for responses. @@ -2527,3 +2506,118 @@ class BulkBodyBulkTaskInstanceBody(BaseModel): ], Field(title="Actions"), ] + + +class AssetExpressionAll(BaseModel): + """ + An "and" node: ``{"all": [...]}`` -- satisfied when all children are satisfied. + """ + + all: Annotated[ + list[ + AssetExpressionAsset + | AssetExpressionAlias + | AssetExpressionRef + | AssetExpressionAny + | AssetExpressionAll + ], + Field(title="All"), + ] + + +class AssetExpressionAny(BaseModel): + """ + An "or" node: ``{"any": [...]}`` -- satisfied when any child is satisfied. + """ + + any: Annotated[ + list[ + AssetExpressionAsset + | AssetExpressionAlias + | AssetExpressionRef + | AssetExpressionAny + | AssetExpressionAll + ], + Field(title="Any"), + ] + + +class DAGDetailsResponse(BaseModel): + """ + Specific serializer for Dag Details responses. + """ + + dag_id: Annotated[str, Field(title="Dag Id")] + dag_display_name: Annotated[str, Field(title="Dag Display Name")] + is_paused: Annotated[bool, Field(title="Is Paused")] + is_stale: Annotated[bool, Field(title="Is Stale")] + last_parsed_time: Annotated[datetime | None, Field(title="Last Parsed Time")] = None + last_parse_duration: Annotated[float | None, Field(title="Last Parse Duration")] = None + last_expired: Annotated[datetime | None, Field(title="Last Expired")] = None + bundle_name: Annotated[str | None, Field(title="Bundle Name")] = None + bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None + relative_fileloc: Annotated[str | None, Field(title="Relative Fileloc")] = None + fileloc: Annotated[str, Field(title="Fileloc")] + description: Annotated[str | None, Field(title="Description")] = None + timetable_summary: Annotated[str | None, Field(title="Timetable Summary")] = None + timetable_description: Annotated[str | None, Field(title="Timetable Description")] = None + timetable_partitioned: Annotated[bool, Field(title="Timetable Partitioned")] + timetable_periodic: Annotated[bool, Field(title="Timetable Periodic")] + tags: Annotated[list[DagTagResponse], Field(title="Tags")] + max_active_tasks: Annotated[int, Field(title="Max Active Tasks")] + max_active_runs: Annotated[int | None, Field(title="Max Active Runs")] = None + max_consecutive_failed_dag_runs: Annotated[int, Field(title="Max Consecutive Failed Dag Runs")] + has_task_concurrency_limits: Annotated[bool, Field(title="Has Task Concurrency Limits")] + has_import_errors: Annotated[bool, Field(title="Has Import Errors")] + next_dagrun_logical_date: Annotated[datetime | None, Field(title="Next Dagrun Logical Date")] = None + next_dagrun_data_interval_start: Annotated[ + datetime | None, Field(title="Next Dagrun Data Interval Start") + ] = None + next_dagrun_data_interval_end: Annotated[ + datetime | None, Field(title="Next Dagrun Data Interval End") + ] = None + next_dagrun_run_after: Annotated[datetime | None, Field(title="Next Dagrun Run After")] = None + allowed_run_types: Annotated[list[DagRunType] | None, Field(title="Allowed Run Types")] = None + owners: Annotated[list[str], Field(title="Owners")] + catchup: Annotated[bool, Field(title="Catchup")] + dag_run_timeout: Annotated[timedelta | None, Field(title="Dag Run Timeout")] = None + asset_expression: Annotated[ + AssetExpressionAsset + | AssetExpressionAlias + | AssetExpressionRef + | AssetExpressionAny + | AssetExpressionAll + | None, + Field(title="Asset Expression"), + ] = None + doc_md: Annotated[str | None, Field(title="Doc Md")] = None + start_date: Annotated[datetime | None, Field(title="Start Date")] = None + end_date: Annotated[datetime | None, Field(title="End Date")] = None + is_paused_upon_creation: Annotated[bool | None, Field(title="Is Paused Upon Creation")] = None + params: Annotated[dict[str, Any] | None, Field(title="Params")] = None + render_template_as_native_obj: Annotated[bool, Field(title="Render Template As Native Obj")] + template_search_path: Annotated[list[str] | None, Field(title="Template Search Path")] = None + timezone: Annotated[str | None, Field(title="Timezone")] = None + last_parsed: Annotated[datetime | None, Field(title="Last Parsed")] = None + default_args: Annotated[dict[str, Any] | None, Field(title="Default Args")] = None + rerun_with_latest_version: Annotated[bool | None, Field(title="Rerun With Latest Version")] = None + owner_links: Annotated[dict[str, str] | None, Field(title="Owner Links")] = None + is_favorite: Annotated[bool | None, Field(title="Is Favorite")] = False + active_runs_count: Annotated[int | None, Field(title="Active Runs Count")] = 0 + is_backfillable: Annotated[ + bool, Field(description="Whether this Dag's schedule supports backfilling.", title="Is Backfillable") + ] + file_token: Annotated[str, Field(description="Return file token.", title="File Token")] + concurrency: Annotated[ + int, + Field( + description="Return max_active_tasks as concurrency.\n\nDeprecated: Use max_active_tasks instead.", + title="Concurrency", + ), + ] + latest_dag_version: Annotated[ + DagVersionResponse | None, Field(description="Return the latest DagVersion.") + ] = None + + +AssetExpressionAll.model_rebuild()