diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index a860810b74606..a578225ea8b08 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2133,6 +2133,16 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st partition_dag_ids: set[str] = set() pending_apdr_ids = [apdr.id for apdr in pending_apdrs] + evaluator = AssetEvaluator(session) + apdr_query = ( + select(AssetPartitionDagRun) + .where(AssetPartitionDagRun.created_dag_run_id.is_(None)) + ) + for apdr in session.scalars( + with_row_locks(apdr_query, of=AssetPartitionDagRun, session=session, skip_locked=True) +): + if TYPE_CHECKING: + assert apdr.target_dag_id # {"dag_id": Serialized Dag} serialized_dags: dict[str, SerializedDAG] = {} for serdag in serdags_by_dag_id.values():