From 58e126625d360f95bdbb2dc839626b74d524b568 Mon Sep 17 00:00:00 2001 From: Shlomit-B <76184325+Shlomit-B@users.noreply.github.com> Date: Tue, 12 Aug 2025 13:36:46 +0300 Subject: [PATCH] [v3-0-test] Skip running db queries for Spans when they are disabled (#54075) Skips recording span and metrics in the scheduler when tracing or metrics are disabled. This avoids unnecessary function calls and improves efficiency. Changes Made: - Added functions to check config values - Used these helpers to avoid computing span and pool metrics in _run_scheduler_loop when not needed (cherry picked from commit 67f55654b5f7bb3c347beebbba10bdac40a975f1) Co-authored-by: Shlomit-B <76184325+Shlomit-B@users.noreply.github.com> closes: #53405 --- .../src/airflow/jobs/scheduler_job_runner.py | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index c46f99d703dd9..0a47913ad5039 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -789,6 +789,20 @@ def _process_task_event_logs(log_records: deque[Log], session: Session): objects = (log_records.popleft() for _ in range(len(log_records))) session.bulk_save_objects(objects=objects, preserve_order=False) + @staticmethod + def _is_metrics_enabled(): + return any( + [ + conf.getboolean("metrics", "statsd_datadog_enabled", fallback=False), + conf.getboolean("metrics", "statsd_on", fallback=False), + conf.getboolean("metrics", "otel_on", fallback=False), + ] + ) + + @staticmethod + def _is_tracing_enabled(): + return conf.getboolean("traces", "otel_on") + def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int: return SchedulerJobRunner.process_executor_events( executor=executor, @@ -1256,15 +1270,17 @@ def _run_scheduler_loop(self) -> None: self._mark_backfills_complete, ) - timers.call_regular_interval( - conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0), - self._emit_pool_metrics, - ) + if self._is_metrics_enabled() or self._is_tracing_enabled(): + timers.call_regular_interval( + conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0), + self._emit_pool_metrics, + ) - timers.call_regular_interval( - conf.getfloat("scheduler", "running_metrics_interval", fallback=30.0), - self._emit_running_ti_metrics, - ) + if self._is_metrics_enabled(): + timers.call_regular_interval( + conf.getfloat("scheduler", "running_metrics_interval", fallback=30.0), + self._emit_running_ti_metrics, + ) timers.call_regular_interval( conf.getfloat("scheduler", "task_instance_heartbeat_timeout_detection_interval", fallback=10.0), @@ -1308,7 +1324,8 @@ def _run_scheduler_loop(self) -> None: ) with create_session() as session: - self._end_spans_of_externally_ended_ops(session) + if self._is_tracing_enabled(): + self._end_spans_of_externally_ended_ops(session) # This will schedule for as many executors as possible. num_queued_tis = self._do_scheduling(session)