Skip to content
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def _trigger_dag(
conf: dict | str | None = None,
logical_date: datetime | None = None,
replace_microseconds: bool = True,
partition_key: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun | None:
"""
Expand Down Expand Up @@ -118,6 +119,7 @@ def _trigger_dag(
triggered_by=triggered_by,
triggering_user_name=triggering_user_name,
state=DagRunState.QUEUED,
partition_key=partition_key,
session=session,
)

Expand All @@ -135,6 +137,7 @@ def trigger_dag(
conf: dict | str | None = None,
logical_date: datetime | None = None,
replace_microseconds: bool = True,
partition_key: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun | None:
"""
Expand Down Expand Up @@ -166,6 +169,7 @@ def trigger_dag(
replace_microseconds=replace_microseconds,
triggered_by=triggered_by,
triggering_user_name=triggering_user_name,
partition_key=partition_key,
session=session,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class TriggerDAGRunPayload(StrictBaseModel):
logical_date: UtcDateTime | None = None
conf: dict = Field(default_factory=dict)
reset_dag_run: bool = False
partition_key: str | None = None


class DagRunStateResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def trigger_dag_run(
logical_date=payload.logical_date,
triggered_by=DagRunTriggeredByType.OPERATOR,
replace_microseconds=False,
partition_key=payload.partition_key,
session=session,
)
except DagRunAlreadyExists:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema

from airflow.api_fastapi.execution_api.datamodels.asset_event import AssetEventResponse, AssetEventsResponse
from airflow.api_fastapi.execution_api.datamodels.dagrun import TriggerDAGRunPayload
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext


Expand All @@ -31,6 +32,7 @@ class AddPartitionKeyField(VersionChange):
instructions_to_migrate_to_previous_version = (
schema(DagRun).field("partition_key").didnt_exist,
schema(AssetEventResponse).field("partition_key").didnt_exist,
schema(TriggerDAGRunPayload).field("partition_key").didnt_exist,
)

@convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,33 @@ def test_trigger_dag_run(self, client, session, dag_maker):
assert dag_run.conf == {"key1": "value1"}
assert dag_run.logical_date == logical_date

def test_trigger_dag_run_with_partition_key(self, client, session, dag_maker):
dag_id = "test_trigger_dag_run_partition_key"
run_id = "test_run_id"
logical_date = timezone.datetime(2025, 2, 20)
partition_key = "2025-02-20"

with dag_maker(dag_id=dag_id, session=session, serialized=True):
EmptyOperator(task_id="test_task")

session.commit()

response = client.post(
f"/execution/dag-runs/{dag_id}/{run_id}",
json={
"logical_date": logical_date.isoformat(),
"conf": {"key1": "value1"},
"partition_key": partition_key,
},
)

assert response.status_code == 204

dag_run = session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one()
assert dag_run.conf == {"key1": "value1"}
assert dag_run.logical_date == logical_date
assert dag_run.partition_key == partition_key

def test_trigger_dag_run_dag_not_found(self, client):
"""Test that a DAG that does not exist cannot be triggered."""
dag_id = "dag_not_found"
Expand Down
1 change: 1 addition & 0 deletions task-sdk/src/airflow/sdk/api/datamodels/_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ class TriggerDAGRunPayload(BaseModel):
logical_date: Annotated[AwareDatetime | None, Field(title="Logical Date")] = None
conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
reset_dag_run: Annotated[bool | None, Field(title="Reset Dag Run")] = False
partition_key: Annotated[str | None, Field(title="Partition Key")] = None


class UpdateHITLDetailPayload(BaseModel):
Expand Down
Loading