From aa571ff3a309056cfb2ddf20a277bd1be203fa5f Mon Sep 17 00:00:00 2001 From: rapsealk Date: Sun, 26 Apr 2026 00:15:25 +0900 Subject: [PATCH] Fix scheduler/triggerer deadlock on task_instance for deferrable tasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The scheduler's check_trigger_timeouts() and the triggerer's Trigger.clean_unused() both issued bulk UPDATEs against task_instance rows referenced via different index paths (state+trigger_timeout vs trigger_id), letting InnoDB acquire row+gap locks in different orders and producing classic A-B / B-A deadlocks under HA scheduler deployments — especially with multiple scheduler replicas. Both writers now select candidate ids in primary-key order with SELECT ... FOR UPDATE SKIP LOCKED, then UPDATE ... WHERE id IN (...) in bounded batches. The deterministic PK lock order eliminates the cross-index deadlock; SKIP LOCKED keeps concurrent scheduler replicas from blocking each other. closes: #65818 Assisted-by: Claude Opus 4.7 (1M context) --- .../src/airflow/jobs/scheduler_job_runner.py | 60 +++++++++++++------ airflow-core/src/airflow/models/trigger.py | 45 +++++++++++--- .../tests/unit/jobs/test_scheduler_job.py | 44 ++++++++++++++ .../tests/unit/models/test_trigger.py | 34 +++++++++++ 4 files changed, 157 insertions(+), 26 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 67115ba757805..c55bc12705093 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -127,6 +127,14 @@ TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule" """:meta private:""" +_TRIGGER_TIMEOUT_BATCH_SIZE = 1000 +"""Max rows per batch in :meth:`SchedulerJobRunner.check_trigger_timeouts`. + +Bounding the batch size narrows the row-lock range so concurrent scheduler replicas +and the triggerer's ``Trigger.clean_unused`` do not deadlock on the same +``task_instance`` rows (issue #65818). +""" + def _eager_load_dag_run_for_validation() -> tuple[LoaderOption, LoaderOption]: """ @@ -2878,25 +2886,43 @@ def check_trigger_timeouts( self, max_retries: int = MAX_DB_RETRIES, session: Session = NEW_SESSION ) -> None: """Mark any "deferred" task as failed if the trigger or execution timeout has passed.""" - for attempt in run_with_db_retries(max_retries, logger=self.log): - with attempt: - result = session.execute( - update(TI) - .where( - TI.state == TaskInstanceState.DEFERRED, - TI.trigger_timeout < timezone.utcnow(), + # Process candidates in primary-key-ordered batches with SKIP LOCKED so concurrent + # scheduler replicas (and the triggerer's clean_unused) don't deadlock on overlapping + # task_instance rows. Tracked: https://github.com/apache/airflow/issues/65818 + while True: + for attempt in run_with_db_retries(max_retries, logger=self.log): + with attempt: + now = timezone.utcnow() + candidates = ( + select(TI.id) + .where( + TI.state == TaskInstanceState.DEFERRED, + TI.trigger_timeout < now, + ) + .order_by(TI.id) + .limit(_TRIGGER_TIMEOUT_BATCH_SIZE) ) - .values( - state=TaskInstanceState.SCHEDULED, - next_method=TRIGGER_FAIL_REPR, - next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT}, - scheduled_dttm=timezone.utcnow(), - trigger_id=None, + ids = list( + session.scalars( + with_row_locks(candidates, of=TI, session=session, skip_locked=True) + ).all() ) - ) - num_timed_out_tasks = getattr(result, "rowcount", 0) - if num_timed_out_tasks: - self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks) + if ids: + session.execute( + update(TI) + .where(TI.id.in_(ids)) + .values( + state=TaskInstanceState.SCHEDULED, + next_method=TRIGGER_FAIL_REPR, + next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT}, + scheduled_dttm=now, + trigger_id=None, + ) + .execution_options(synchronize_session=False) + ) + self.log.info("Timed out %i deferred tasks without fired triggers", len(ids)) + if len(ids) < _TRIGGER_TIMEOUT_BATCH_SIZE: + break # [START find_and_purge_task_instances_without_heartbeats] def _find_and_purge_task_instances_without_heartbeats(self) -> None: diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index 9707995288566..74d53c1efd6e1 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -55,6 +55,13 @@ :meta private: """ +_TRIGGER_ID_CLEANUP_BATCH_SIZE = 1000 +"""Max rows per batch when ``Trigger.clean_unused`` clears stale ``trigger_id`` values. + +Bounding the batch size keeps the row-lock range narrow so the triggerer's cleanup +does not deadlock against the scheduler's ``check_trigger_timeouts`` (issue #65818). +""" + log = logging.getLogger(__name__) @@ -226,16 +233,36 @@ def clean_unused(cls, session: Session = NEW_SESSION) -> None: Triggers have a one-to-many relationship to task instances, so we need to clean those up first. Afterward we can drop the triggers not referenced by anyone. """ - # Update all task instances with trigger IDs that are not DEFERRED to remove them - for attempt in run_with_db_retries(): - with attempt: - session.execute( - update(TaskInstance) - .where( - TaskInstance.state != TaskInstanceState.DEFERRED, TaskInstance.trigger_id.is_not(None) + # Clear trigger_id on non-deferred task instances in batches, locking by primary key + # with SKIP LOCKED so we don't deadlock against the scheduler's check_trigger_timeouts() + # which writes the same column from a different index path. + # Tracked: https://github.com/apache/airflow/issues/65818 + while True: + for attempt in run_with_db_retries(): + with attempt: + candidates = ( + select(TaskInstance.id) + .where( + TaskInstance.state != TaskInstanceState.DEFERRED, + TaskInstance.trigger_id.is_not(None), + ) + .order_by(TaskInstance.id) + .limit(_TRIGGER_ID_CLEANUP_BATCH_SIZE) ) - .values(trigger_id=None) - ) + ti_ids = list( + session.scalars( + with_row_locks(candidates, session, skip_locked=True, of=TaskInstance) + ).all() + ) + if ti_ids: + session.execute( + update(TaskInstance) + .where(TaskInstance.id.in_(ti_ids)) + .values(trigger_id=None) + .execution_options(synchronize_session=False) + ) + if len(ti_ids) < _TRIGGER_ID_CLEANUP_BATCH_SIZE: + break # Get all triggers that have no task instances, assets, or callbacks depending on them and delete them ids = ( diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 061a87e3aa420..24aef0f25cfa5 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -6831,6 +6831,50 @@ def test_timeout_triggers(self, dag_maker): assert ti1.next_method == "__fail__" assert ti2.state == State.DEFERRED + def test_timeout_triggers_processes_more_than_one_batch(self, dag_maker, monkeypatch): + """ + Tests that ``check_trigger_timeouts`` flips every timed-out deferred TI even when + more rows match than fit in a single batch. The batched, primary-key-ordered + + SKIP LOCKED implementation is what avoids deadlocks against + ``Trigger.clean_unused`` (issue #65818). + """ + import airflow.jobs.scheduler_job_runner as scheduler_module + + monkeypatch.setattr(scheduler_module, "_TRIGGER_TIMEOUT_BATCH_SIZE", 2) + + session = settings.Session() + with dag_maker( + dag_id="test_timeout_triggers_batched", + start_date=DEFAULT_DATE, + schedule="@once", + max_active_runs=5, + session=session, + ): + EmptyOperator(task_id="dummy1") + + tis = [] + past = timezone.utcnow() - datetime.timedelta(seconds=60) + for i in range(5): + dr = dag_maker.create_dagrun( + run_id=f"batched_{i}", + logical_date=DEFAULT_DATE + datetime.timedelta(seconds=i), + ) + ti = dr.get_task_instance("dummy1", session) + ti.state = State.DEFERRED + ti.trigger_timeout = past + tis.append(ti) + session.flush() + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + self.job_runner.check_trigger_timeouts(session=session) + + for ti in tis: + session.refresh(ti) + assert ti.state == State.SCHEDULED + assert ti.next_method == "__fail__" + def test_retry_on_db_error_when_update_timeout_triggers(self, dag_maker, testing_dag_bundle, session): """ Tests that it will retry on DB error like deadlock when updating timeout triggers. diff --git a/airflow-core/tests/unit/models/test_trigger.py b/airflow-core/tests/unit/models/test_trigger.py index dfd0f2e99cd0a..9f75ba34e8657 100644 --- a/airflow-core/tests/unit/models/test_trigger.py +++ b/airflow-core/tests/unit/models/test_trigger.py @@ -164,6 +164,40 @@ def test_clean_unused(session, dag_maker): assert {result.id for result in results} == {trigger1.id, trigger4.id, trigger5.id, trigger6.id} +def test_clean_unused_clears_trigger_id_in_batches(session, dag_maker, monkeypatch): + """ + Tests that ``Trigger.clean_unused`` clears ``trigger_id`` on every eligible task instance + even when more rows match the predicate than fit in a single batch. + + The batched, primary-key-ordered + SKIP LOCKED implementation is what avoids deadlocks + against ``SchedulerJobRunner.check_trigger_timeouts`` (issue #65818). + """ + import airflow.models.trigger as trigger_module + + monkeypatch.setattr(trigger_module, "_TRIGGER_ID_CLEANUP_BATCH_SIZE", 2) + + triggers = [Trigger(classpath=f"airflow.triggers.testing.SuccessTrigger{i}", kwargs={}) for i in range(5)] + session.add_all(triggers) + session.flush() + + with dag_maker(session=session, dag_id="test_clean_unused_batched"): + for i in range(5): + EmptyOperator(task_id=f"fake{i}") + + dr = dag_maker.create_dagrun(logical_date=timezone.utcnow()) + tis = {ti.task_id: ti for ti in dr.task_instances} + for i in range(5): + tis[f"fake{i}"].state = State.SUCCESS + tis[f"fake{i}"].trigger_id = triggers[i].id + session.flush() + + Trigger.clean_unused(session=session) + + for i in range(5): + session.refresh(tis[f"fake{i}"]) + assert tis[f"fake{i}"].trigger_id is None, f"trigger_id should be cleared on fake{i}" + + @patch.object(TriggererCallback, "handle_event") def test_submit_event(mock_callback_handle_event, session, create_task_instance): """