From b164b9834ac1e36167cd42b236740ce7744c2681 Mon Sep 17 00:00:00 2001 From: Ramakrishna Date: Fri, 5 Jun 2026 12:49:41 +0530 Subject: [PATCH] fix(scheduler): add row lock to AssetPartitionDagRun fetch to prevent duplicate DagRun creation in HA --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index f5cb2d8287023..051b3cec6b445 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1883,9 +1883,13 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st partition_dag_ids: set[str] = set() evaluator = AssetEvaluator(session) + apdr_query = ( + select(AssetPartitionDagRun) + .where(AssetPartitionDagRun.created_dag_run_id.is_(None)) + ) for apdr in session.scalars( - select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None)) - ): + with_row_locks(apdr_query, of=AssetPartitionDagRun, session=session, skip_locked=True) +): if TYPE_CHECKING: assert apdr.target_dag_id