Skip to content

Commit 519c4f1

Browse files
vatsrahul1001Lee-W
andcommitted
fix: use Dag form when materializing asset (#64211) (#65303)
* fix: use Dag form when materializing asset * fixup! fix: use Dag form when materializing asset * fixup! fixup! fix: use Dag form when materializing asset * fixup! fixup! fixup! fix: use Dag form when materializing asset * fixup! fixup! fixup! fixup! fix: use Dag form when materializing asset * test: add test case test_should_respond_200_with_trigger_fields_without_dag_run_id * fix: restore useTrigger in TriggerDAGModal after TriggerDAGForm refactor TriggerDAGForm no longer owns the trigger mutation after the refactor (onSubmitTrigger was made a prop). TriggerDAGModal must now supply useTrigger and pass error/isPending/onSubmitTrigger down to the form, otherwise clicking Trigger does nothing and dagRunId stays null. (cherry picked from commit b312314) Co-authored-by: Wei Lee <weilee.rx@gmail.com>
1 parent 7c4dbf9 commit 519c4f1

12 files changed

Lines changed: 382 additions & 82 deletions

File tree

airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,18 @@
1919

2020
from collections.abc import Iterable
2121
from datetime import datetime
22+
from typing import TYPE_CHECKING
2223

23-
from pydantic import AliasPath, ConfigDict, Field, JsonValue, NonNegativeInt, field_validator
24+
from pydantic import AliasPath, AwareDatetime, ConfigDict, Field, JsonValue, NonNegativeInt, field_validator
2425

2526
from airflow._shared.secrets_masker import redact
27+
from airflow._shared.timezones import timezone
2628
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
29+
from airflow.api_fastapi.core_api.datamodels.dag_run import TriggerDAGRunPostBody
30+
from airflow.utils.types import DagRunType
31+
32+
if TYPE_CHECKING:
33+
from airflow.serialization.definitions.dag import SerializedDAG
2734

2835

2936
class DagScheduleAssetReference(StrictBaseModel):
@@ -185,3 +192,21 @@ def set_from_rest_api(cls, v: dict) -> dict:
185192
return v
186193

187194
model_config = ConfigDict(extra="forbid")
195+
196+
197+
class MaterializeAssetBody(TriggerDAGRunPostBody):
198+
"""Materialize asset request."""
199+
200+
logical_date: AwareDatetime | None = None
201+
202+
def validate_context(self, dag: SerializedDAG) -> dict:
203+
params = super().validate_context(dag)
204+
if self.dag_run_id is None:
205+
params["run_id"] = dag.timetable.generate_run_id(
206+
run_type=DagRunType.ASSET_MATERIALIZATION,
207+
run_after=timezone.coerce_datetime(params["run_after"]),
208+
data_interval=params["data_interval"],
209+
)
210+
return params
211+
212+
model_config = ConfigDict(extra="forbid")

airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,14 @@ paths:
484484
schema:
485485
type: integer
486486
title: Asset Id
487+
requestBody:
488+
content:
489+
application/json:
490+
schema:
491+
anyOf:
492+
- $ref: '#/components/schemas/MaterializeAssetBody'
493+
- type: 'null'
494+
title: Body
487495
responses:
488496
'200':
489497
description: Successful Response
@@ -11943,6 +11951,57 @@ components:
1194311951
type: object
1194411952
title: LastAssetEventResponse
1194511953
description: Last asset event response serializer.
11954+
MaterializeAssetBody:
11955+
properties:
11956+
dag_run_id:
11957+
anyOf:
11958+
- type: string
11959+
- type: 'null'
11960+
title: Dag Run Id
11961+
data_interval_start:
11962+
anyOf:
11963+
- type: string
11964+
format: date-time
11965+
- type: 'null'
11966+
title: Data Interval Start
11967+
data_interval_end:
11968+
anyOf:
11969+
- type: string
11970+
format: date-time
11971+
- type: 'null'
11972+
title: Data Interval End
11973+
logical_date:
11974+
anyOf:
11975+
- type: string
11976+
format: date-time
11977+
- type: 'null'
11978+
title: Logical Date
11979+
run_after:
11980+
anyOf:
11981+
- type: string
11982+
format: date-time
11983+
- type: 'null'
11984+
title: Run After
11985+
conf:
11986+
anyOf:
11987+
- additionalProperties: true
11988+
type: object
11989+
- type: 'null'
11990+
title: Conf
11991+
note:
11992+
anyOf:
11993+
- type: string
11994+
- type: 'null'
11995+
title: Note
11996+
partition_key:
11997+
anyOf:
11998+
- type: string
11999+
- type: 'null'
12000+
title: Partition Key
12001+
additionalProperties: false
12002+
type: object
12003+
title: MaterializeAssetBody
12004+
description: Materialize asset request.
1194612005
PatchTaskInstanceBody:
1194712006
properties:
1194812007
new_state:

airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
AssetEventResponse,
5555
AssetResponse,
5656
CreateAssetEventsBody,
57+
MaterializeAssetBody,
5758
QueuedEventCollectionResponse,
5859
QueuedEventResponse,
5960
)
@@ -387,6 +388,7 @@ def materialize_asset(
387388
dag_bag: DagBagDep,
388389
user: GetUserDep,
389390
session: SessionDep,
391+
body: MaterializeAssetBody | None = None,
390392
) -> DAGRunResponse:
391393
"""Materialize an asset by triggering a DAG run that produces it."""
392394
dag_id_it = iter(
@@ -425,17 +427,19 @@ def materialize_asset(
425427
f"Dag with dag_id: '{dag_id}' does not allow asset materialization runs",
426428
)
427429

430+
params = (body or MaterializeAssetBody()).validate_context(dag)
428431
return dag.create_dagrun(
429-
run_id=dag.timetable.generate_run_id(
430-
run_type=DagRunType.ASSET_MATERIALIZATION,
431-
run_after=(run_after := timezone.coerce_datetime(timezone.utcnow())),
432-
data_interval=None,
433-
),
434-
run_after=run_after,
432+
run_id=params["run_id"],
433+
logical_date=params["logical_date"],
434+
data_interval=params["data_interval"],
435+
run_after=params["run_after"],
436+
conf=params["conf"],
435437
run_type=DagRunType.ASSET_MATERIALIZATION,
436438
triggered_by=DagRunTriggeredByType.REST_API,
437439
triggering_user_name=user.get_name(),
438440
state=DagRunState.QUEUED,
441+
partition_key=params["partition_key"],
442+
note=params["note"],
439443
session=session,
440444
)
441445

airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from "@tanstack/react-query";
44
import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen";
5-
import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen";
5+
import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen";
66
import * as Common from "./common";
77
/**
88
* Get Assets
@@ -1765,14 +1765,17 @@ export const useAssetServiceCreateAssetEvent = <TData = Common.AssetServiceCreat
17651765
* Materialize an asset by triggering a DAG run that produces it.
17661766
* @param data The data for the request.
17671767
* @param data.assetId
1768+
* @param data.requestBody
17681769
* @returns DAGRunResponse Successful Response
17691770
* @throws ApiError
17701771
*/
17711772
export const useAssetServiceMaterializeAsset = <TData = Common.AssetServiceMaterializeAssetMutationResult, TError = unknown, TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError, {
17721773
assetId: number;
1774+
requestBody?: MaterializeAssetBody;
17731775
}, TContext>, "mutationFn">) => useMutation<TData, TError, {
17741776
assetId: number;
1775-
}, TContext>({ mutationFn: ({ assetId }) => AssetService.materializeAsset({ assetId }) as unknown as Promise<TData>, ...options });
1777+
requestBody?: MaterializeAssetBody;
1778+
}, TContext>({ mutationFn: ({ assetId, requestBody }) => AssetService.materializeAsset({ assetId, requestBody }) as unknown as Promise<TData>, ...options });
17761779
/**
17771780
* Create Backfill
17781781
* @param data The data for the request.

airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4461,6 +4461,108 @@ export const $LastAssetEventResponse = {
44614461
description: 'Last asset event response serializer.'
44624462
} as const;
44634463

4464+
export const $MaterializeAssetBody = {
4465+
properties: {
4466+
dag_run_id: {
4467+
anyOf: [
4468+
{
4469+
type: 'string'
4470+
},
4471+
{
4472+
type: 'null'
4473+
}
4474+
],
4475+
title: 'Dag Run Id'
4476+
},
4477+
data_interval_start: {
4478+
anyOf: [
4479+
{
4480+
type: 'string',
4481+
format: 'date-time'
4482+
},
4483+
{
4484+
type: 'null'
4485+
}
4486+
],
4487+
title: 'Data Interval Start'
4488+
},
4489+
data_interval_end: {
4490+
anyOf: [
4491+
{
4492+
type: 'string',
4493+
format: 'date-time'
4494+
},
4495+
{
4496+
type: 'null'
4497+
}
4498+
],
4499+
title: 'Data Interval End'
4500+
},
4501+
logical_date: {
4502+
anyOf: [
4503+
{
4504+
type: 'string',
4505+
format: 'date-time'
4506+
},
4507+
{
4508+
type: 'null'
4509+
}
4510+
],
4511+
title: 'Logical Date'
4512+
},
4513+
run_after: {
4514+
anyOf: [
4515+
{
4516+
type: 'string',
4517+
format: 'date-time'
4518+
},
4519+
{
4520+
type: 'null'
4521+
}
4522+
],
4523+
title: 'Run After'
4524+
},
4525+
conf: {
4526+
anyOf: [
4527+
{
4528+
additionalProperties: true,
4529+
type: 'object'
4530+
},
4531+
{
4532+
type: 'null'
4533+
}
4534+
],
4535+
title: 'Conf'
4536+
},
4537+
note: {
4538+
anyOf: [
4539+
{
4540+
type: 'string'
4541+
},
4542+
{
4543+
type: 'null'
4544+
}
4545+
],
4546+
title: 'Note'
4547+
},
4548+
partition_key: {
4549+
anyOf: [
4550+
{
4551+
type: 'string'
4552+
},
4553+
{
4554+
type: 'null'
4555+
}
4556+
],
4557+
title: 'Partition Key'
4558+
}
4559+
},
4560+
additionalProperties: false,
4561+
type: 'object',
4562+
title: 'MaterializeAssetBody',
4563+
description: 'Materialize asset request.'
4564+
} as const;
4565+
44644566
export const $PatchTaskInstanceBody = {
44654567
properties: {
44664568
new_state: {

airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ export class AssetService {
172172
* Materialize an asset by triggering a DAG run that produces it.
173173
* @param data The data for the request.
174174
* @param data.assetId
175+
* @param data.requestBody
175176
* @returns DAGRunResponse Successful Response
176177
* @throws ApiError
177178
*/
@@ -182,6 +183,8 @@ export class AssetService {
182183
path: {
183184
asset_id: data.assetId
184185
},
186+
body: data.requestBody,
187+
mediaType: 'application/json',
185188
errors: {
186189
400: 'Bad Request',
187190
401: 'Unauthorized',

airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,6 +1133,22 @@ export type LastAssetEventResponse = {
11331133
timestamp?: string | null;
11341134
};
11351135

1136+
/**
1137+
* Materialize asset request.
1138+
*/
1139+
export type MaterializeAssetBody = {
1140+
dag_run_id?: string | null;
1141+
data_interval_start?: string | null;
1142+
data_interval_end?: string | null;
1143+
logical_date?: string | null;
1144+
run_after?: string | null;
1145+
conf?: {
1146+
[key: string]: unknown;
1147+
} | null;
1148+
note?: string | null;
1149+
partition_key?: string | null;
1150+
};
1151+
11361152
/**
11371153
* Request body for Clear Task Instances endpoint.
11381154
*/
@@ -2322,6 +2338,7 @@ export type CreateAssetEventResponse = AssetEventResponse;
23222338

23232339
export type MaterializeAssetData = {
23242340
assetId: number;
2341+
requestBody?: MaterializeAssetBody | null;
23252342
};
23262343

23272344
export type MaterializeAssetResponse = DAGRunResponse;

0 commit comments

Comments
 (0)