From f72f4c8935d42edf0cc16ec897a82cbbe17df76c Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Sat, 27 Mar 2021 01:40:42 +0800 Subject: [PATCH] Batch send to not overload multiprocessing pipe This breaks the loops sending callbacks to the multiprocessing pipe into 100-size batches, and call DagFileProcessorAgent.heartbeat() to consume the pipe between the batches. This avoids the pipe from becoming full, which would make Pipe.send() block and deadlocking the process. --- airflow/jobs/scheduler_job.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 3b801811718fb..b2fb28c259a0e 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -72,6 +72,8 @@ DR = models.DagRun DM = models.DagModel +CALLBACK_SEND_BATCH_SIZE = 100 + class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, MultiprocessingStartMethodMixin): """Runs DAG processing in a separate process using DagFileProcessor @@ -1219,7 +1221,12 @@ def _process_executor_events(self, session: Session = None) -> int: # Check state of finished tasks filter_for_tis = TI.filter_for_tis(tis_with_right_state) tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all() - for ti in tis: + + # Send tasks in batches of 100, and call the agent's heartbeat() + # to process the multiprocessing pipe. This keep the pipe from + # being full, which would block pipe.send() and cause deadlocking + # when we have a lot of tasks to send. + for i, ti in enumerate(tis, 1): try_number = ti_primary_key_to_try_number_map[ti.key.primary] buffer_key = ti.key.with_try_number(try_number) state, info = event_buffer.pop(buffer_key) @@ -1245,6 +1252,9 @@ def _process_executor_events(self, session: Session = None) -> int: self.processor_agent.send_callback_to_execute(request) + if i % CALLBACK_SEND_BATCH_SIZE == 0: + self.processor_agent.heartbeat() + return len(event_buffer) def _execute(self) -> None: @@ -1514,7 +1524,11 @@ def _do_scheduling(self, session) -> int: for dag_id, execution_date in query: active_runs_by_dag_id[dag_id].add(execution_date) - for dag_run in dag_runs: + # Send dag_runs in batches of 100, and call the agent's heartbeat() + # to process the multiprocessing pipe. This keep the pipe from + # being full, which would block pipe.send() and cause deadlocking + # when we have a lot of dags to send. + for i, dag_run in enumerate(dag_runs, 1): # Use try_except to not stop the Scheduler when a Serialized DAG is not found # This takes care of Dynamic DAGs especially # SerializedDagNotFound should not happen here in the same loop because the DagRun would @@ -1526,6 +1540,8 @@ def _do_scheduling(self, session) -> int: except SerializedDagNotFound: self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id) continue + if i % CALLBACK_SEND_BATCH_SIZE == 0: + self.processor_agent.heartbeat() guard.commit()