diff --git a/airflow/api/common/trigger_dag.py b/airflow/api/common/trigger_dag.py index 08ee1726ccc22..eb34c958e040a 100644 --- a/airflow/api/common/trigger_dag.py +++ b/airflow/api/common/trigger_dag.py @@ -84,7 +84,10 @@ def _trigger_dag( data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date) run_id = run_id or dag.timetable.generate_run_id( - run_type=DagRunType.MANUAL, logical_date=coerced_logical_date, data_interval=data_interval + run_type=DagRunType.MANUAL, + logical_date=coerced_logical_date, + data_interval=data_interval, + run_after=data_interval.end, ) # This intentionally does not use 'session' in the current scope because it diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py index c2560613def70..d0db532521eba 100644 --- a/airflow/api_connexion/schemas/dag_run_schema.py +++ b/airflow/api_connexion/schemas/dag_run_schema.py @@ -88,7 +88,8 @@ def autogenerate(self, data, **kwargs): if "dag_run_id" not in data: try: data["dag_run_id"] = DagRun.generate_run_id( - DagRunType.MANUAL, timezone.parse(data["logical_date"]) + DagRunType.MANUAL, + timezone.parse(data["logical_date"]), ) except (ParserError, TypeError) as err: raise BadRequest("Incorrect datetime argument", detail=str(err)) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 80db78d87f512..d048e27fd3d57 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -371,6 +371,7 @@ def trigger_dag_run( run_type=DagRunType.MANUAL, logical_date=logical_date, data_interval=data_interval, + run_after=data_interval.end, ) dag_run = dag.create_dagrun( diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 1e98dbf2aba29..07d22774f5406 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1281,6 +1281,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - run_type=DagRunType.SCHEDULED, logical_date=dag_model.next_dagrun, data_interval=data_interval, + run_after=dag_model.next_dagrun_create_after, ), logical_date=dag_model.next_dagrun, data_interval=data_interval, @@ -1391,6 +1392,7 @@ def _create_dag_runs_asset_triggered( run_type=DagRunType.ASSET_TRIGGERED, logical_date=logical_date, data_interval=data_interval, + run_after=max(logical_dates.values()), session=session, events=asset_events, ), diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index a2347e76fdc6e..e64cee1324f32 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -293,6 +293,7 @@ def _create_backfill_dag_run( run_type=DagRunType.BACKFILL_JOB, logical_date=info.logical_date, data_interval=info.data_interval, + run_after=info.run_after, ), logical_date=info.logical_date, data_interval=info.data_interval, diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 425cbaca68880..0972004d31847 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -628,7 +628,11 @@ def run( # This is _mostly_ only used in tests dr = DagRun( dag_id=self.dag_id, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, info.logical_date), + run_id=DagRun.generate_run_id( + DagRunType.MANUAL, + info.logical_date, + run_after=info.run_after, + ), run_type=DagRunType.MANUAL, logical_date=info.logical_date, data_interval=info.data_interval, diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 8fae1604ed761..11feeaaedd674 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1638,7 +1638,7 @@ def add_logger_if_needed(ti: TaskInstance): logical_date=logical_date, data_interval=data_interval, run_after=data_interval.end, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), + run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date, run_after=data_interval.end), session=session, conf=run_conf, triggered_by=DagRunTriggeredByType.TEST, diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 22757c972e953..1e61ef41a54c0 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -621,10 +621,12 @@ def find_duplicate(cls, dag_id: str, run_id: str, *, session: Session = NEW_SESS return session.scalars(select(cls).where(cls.dag_id == dag_id, cls.run_id == run_id)).one_or_none() @staticmethod - def generate_run_id(run_type: DagRunType, logical_date: datetime) -> str: + def generate_run_id( + run_type: DagRunType, logical_date: datetime | None, run_after: datetime | None = None + ) -> str: """Generate Run ID based on Run Type and logical Date.""" # _Ensure_ run_type is a DagRunType, not just a string from user code - return DagRunType(run_type).generate_run_id(logical_date) + return DagRunType(run_type).generate_run_id(logical_date, run_after) @staticmethod @provide_session diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py index da81090ff9a6d..029b77b4b4b23 100644 --- a/airflow/timetables/base.py +++ b/airflow/timetables/base.py @@ -281,8 +281,9 @@ def generate_run_id( self, *, run_type: DagRunType, - logical_date: DateTime, + logical_date: DateTime | None, data_interval: DataInterval | None, + run_after: DateTime | None = None, **extra, ) -> str: - return run_type.generate_run_id(logical_date) + return run_type.generate_run_id(logical_date, run_after) diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py index 20e8085fe0d37..59e9038a45d7d 100644 --- a/airflow/timetables/simple.py +++ b/airflow/timetables/simple.py @@ -186,15 +186,16 @@ def generate_run_id( self, *, run_type: DagRunType, - logical_date: DateTime, + logical_date: DateTime | None, data_interval: DataInterval | None, + run_after: DateTime | None = None, session: Session | None = None, events: Collection[AssetEvent] | None = None, **extra, ) -> str: from airflow.models.dagrun import DagRun - return DagRun.generate_run_id(run_type, logical_date) + return DagRun.generate_run_id(run_type, logical_date, run_after) def data_interval_for_events( self, diff --git a/airflow/utils/types.py b/airflow/utils/types.py index 46f295c4ee21a..07d0b1988b650 100644 --- a/airflow/utils/types.py +++ b/airflow/utils/types.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, TypedDict import airflow.sdk.definitions._internal.types +from airflow.utils.strings import get_random_string if TYPE_CHECKING: from datetime import datetime @@ -42,7 +43,11 @@ class DagRunType(str, enum.Enum): def __str__(self) -> str: return self.value - def generate_run_id(self, logical_date: datetime) -> str: + def generate_run_id(self, logical_date: datetime | None, run_after: datetime | None) -> str: + if logical_date is None: + if run_after is None: + raise ValueError("run_after cannot be None") + return run_after + get_random_string() return f"{self}__{logical_date.isoformat()}" @staticmethod diff --git a/airflow/www/views.py b/airflow/www/views.py index 4ce5c6564619f..ba66e2e0ce952 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2230,6 +2230,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): logical_date=logical_date, data_interval=data_interval, run_type=DagRunType.MANUAL, + run_after=data_interval.end, ) try: diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index fde7e9eafa62a..9f12f7939adcc 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3121,7 +3121,7 @@ def test_get_asset_triggered_next_run_info_with_unresolved_asset_alias(dag_maker ) def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: DagRunType) -> None: dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule="@daily") - run_id = run_id_type.generate_run_id(DEFAULT_DATE) + run_id = run_id_type.generate_run_id(logical_date=DEFAULT_DATE, run_after=DEFAULT_DATE) with pytest.raises(ValueError) as ctx: dag.create_dagrun(