diff --git a/airflow-core/src/airflow/api/common/trigger_dag.py b/airflow-core/src/airflow/api/common/trigger_dag.py index 614140b58c4a9..77912f58d2637 100644 --- a/airflow-core/src/airflow/api/common/trigger_dag.py +++ b/airflow-core/src/airflow/api/common/trigger_dag.py @@ -50,6 +50,7 @@ def _trigger_dag( conf: dict | str | None = None, logical_date: datetime | None = None, replace_microseconds: bool = True, + partition_key: str | None = None, session: Session = NEW_SESSION, ) -> DagRun | None: """ @@ -118,6 +119,7 @@ def _trigger_dag( triggered_by=triggered_by, triggering_user_name=triggering_user_name, state=DagRunState.QUEUED, + partition_key=partition_key, session=session, ) @@ -135,6 +137,7 @@ def trigger_dag( conf: dict | str | None = None, logical_date: datetime | None = None, replace_microseconds: bool = True, + partition_key: str | None = None, session: Session = NEW_SESSION, ) -> DagRun | None: """ @@ -166,6 +169,7 @@ def trigger_dag( replace_microseconds=replace_microseconds, triggered_by=triggered_by, triggering_user_name=triggering_user_name, + partition_key=partition_key, session=session, ) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py index 1f73156cebf5a..32b0b7fead07b 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py @@ -30,6 +30,7 @@ class TriggerDAGRunPayload(StrictBaseModel): logical_date: UtcDateTime | None = None conf: dict = Field(default_factory=dict) reset_dag_run: bool = False + partition_key: str | None = None class DagRunStateResponse(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py index 7763850b5ee4e..189201db68812 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py @@ -122,6 +122,7 @@ def trigger_dag_run( logical_date=payload.logical_date, triggered_by=DagRunTriggeredByType.OPERATOR, replace_microseconds=False, + partition_key=payload.partition_key, session=session, ) except DagRunAlreadyExists: diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py index b11811179f1e8..9363d825cef89 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py @@ -20,6 +20,7 @@ from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema from airflow.api_fastapi.execution_api.datamodels.asset_event import AssetEventResponse, AssetEventsResponse +from airflow.api_fastapi.execution_api.datamodels.dagrun import TriggerDAGRunPayload from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext @@ -31,6 +32,7 @@ class AddPartitionKeyField(VersionChange): instructions_to_migrate_to_previous_version = ( schema(DagRun).field("partition_key").didnt_exist, schema(AssetEventResponse).field("partition_key").didnt_exist, + schema(TriggerDAGRunPayload).field("partition_key").didnt_exist, ) @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index 91c9c314a0e6d..1691ad535310b 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -61,6 +61,33 @@ def test_trigger_dag_run(self, client, session, dag_maker): assert dag_run.conf == {"key1": "value1"} assert dag_run.logical_date == logical_date + def test_trigger_dag_run_with_partition_key(self, client, session, dag_maker): + dag_id = "test_trigger_dag_run_partition_key" + run_id = "test_run_id" + logical_date = timezone.datetime(2025, 2, 20) + partition_key = "2025-02-20" + + with dag_maker(dag_id=dag_id, session=session, serialized=True): + EmptyOperator(task_id="test_task") + + session.commit() + + response = client.post( + f"/execution/dag-runs/{dag_id}/{run_id}", + json={ + "logical_date": logical_date.isoformat(), + "conf": {"key1": "value1"}, + "partition_key": partition_key, + }, + ) + + assert response.status_code == 204 + + dag_run = session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one() + assert dag_run.conf == {"key1": "value1"} + assert dag_run.logical_date == logical_date + assert dag_run.partition_key == partition_key + def test_trigger_dag_run_dag_not_found(self, client): """Test that a DAG that does not exist cannot be triggered.""" dag_id = "dag_not_found" diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 149aa5d7dcb0d..0621df05b0eed 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -355,6 +355,7 @@ class TriggerDAGRunPayload(BaseModel): logical_date: Annotated[AwareDatetime | None, Field(title="Logical Date")] = None conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None reset_dag_run: Annotated[bool | None, Field(title="Reset Dag Run")] = False + partition_key: Annotated[str | None, Field(title="Partition Key")] = None class UpdateHITLDetailPayload(BaseModel):