diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index d93d3594cae29..a1fd7a9173bc8 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -656,8 +656,6 @@ def _per_task_process(key, ti: TaskInstance, session): _per_task_process(key, ti, session) try: session.commit() - # break the retry loop - break except OperationalError: self.log.error( "Failed to commit task state due to operational error. " @@ -669,6 +667,9 @@ def _per_task_process(key, ti: TaskInstance, session): if i == max_attempts - 1: raise # retry the loop + else: + # break the retry loop + break except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e: self.log.debug(e) @@ -815,11 +816,10 @@ def _execute_dagruns( for dagrun_info in dagrun_infos: for dag in self._get_dag_with_subdags(): dag_run = self._get_dag_run(dagrun_info, dag, session=session) - if dag_run is None: - continue - tis_map = self._task_instances_for_dag_run(dag, dag_run, session=session) - ti_status.active_runs.append(dag_run) - ti_status.to_run.update(tis_map or {}) + if dag_run is not None: + tis_map = self._task_instances_for_dag_run(dag, dag_run, session=session) + ti_status.active_runs.append(dag_run) + ti_status.to_run.update(tis_map or {}) processed_dag_run_dates = self._process_backfill_task_instances( ti_status=ti_status, diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 52f6fb737a442..1507a6f06f206 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -897,13 +897,11 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) ) for dag_run in paused_runs: dag = self.dagbag.get_dag(dag_run.dag_id, session=session) - if dag is None: - continue - - dag_run.dag = dag - _, callback_to_run = dag_run.update_state(execute_callbacks=False, session=session) - if callback_to_run: - self._send_dag_callbacks_to_processor(dag, callback_to_run) + if dag is not None: + dag_run.dag = dag + _, callback_to_run = dag_run.update_state(execute_callbacks=False, session=session) + if callback_to_run: + self._send_dag_callbacks_to_processor(dag, callback_to_run) except Exception as e: # should not fail the scheduler self.log.exception("Failed to update dag run state for paused dags due to %s", e) @@ -1073,13 +1071,12 @@ def _do_scheduling(self, session: Session) -> int: ) for dag_run, callback_to_run in callback_tuples: dag = cached_get_dag(dag_run.dag_id) - - if not dag: + if dag: + # Sending callbacks there as in standalone_dag_processor they are adding to the database, + # so it must be done outside of prohibit_commit. + self._send_dag_callbacks_to_processor(dag, callback_to_run) + else: self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id) - continue - # Sending callbacks there as in standalone_dag_processor they are adding to the database, - # so it must be done outside of prohibit_commit. - self._send_dag_callbacks_to_processor(dag, callback_to_run) with prohibit_commit(session) as guard: # Without this, the session has an invalid view of the DB