From 32fb5e3d65a27c3fc2069f8c40478aaef11681c3 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Tue, 14 Apr 2026 17:41:14 -0700 Subject: [PATCH 1/3] clean up redundant api server uri generation --- .../src/airflow/executors/base_executor.py | 29 ++++++++++++++----- .../src/airflow/executors/local_executor.py | 17 ++--------- .../unit/executors/test_local_executor.py | 10 +++---- .../celery/executors/celery_executor_utils.py | 10 ++----- 4 files changed, 30 insertions(+), 36 deletions(-) diff --git a/airflow-core/src/airflow/executors/base_executor.py b/airflow-core/src/airflow/executors/base_executor.py index cc708545cfac0..59804b8bcda41 100644 --- a/airflow-core/src/airflow/executors/base_executor.py +++ b/airflow-core/src/airflow/executors/base_executor.py @@ -43,6 +43,25 @@ PARALLELISM: int = conf.getint("core", "PARALLELISM") + +def get_execution_api_server_url(conf_source: AirflowConfigParser | ExecutorConf = conf) -> str: + """ + Resolve the execution API server URL from configuration. + + :param conf_source: Configuration source to read from. Defaults to the global ``conf``. + Team-specific executors can pass their own config (e.g. ``ExecutorConf``) to resolve + a team-specific URL. + """ + base_url = conf_source.get("api", "base_url", fallback="/") + + # Both .get() statements have a fallback= which guarantee a str, mypy sees + # .get()'s return is typed as str | None so we have to add redundant checks + if not base_url or base_url.startswith("/"): + base_url = f"http://localhost:8080{base_url}" + default_execution_api_server = f"{base_url.rstrip('/')}/execution/" + return conf_source.get("core", "execution_api_server_url", fallback=default_execution_api_server) # type: ignore[return-value] + + if TYPE_CHECKING: import argparse from datetime import datetime @@ -53,6 +72,7 @@ from airflow.callbacks.base_callback_sink import BaseCallbackSink from airflow.callbacks.callback_requests import CallbackRequest from airflow.cli.cli_config import GroupCommand + from airflow.configuration import AirflowConfigParser from airflow.executors.executor_utils import ExecutorName from airflow.executors.workloads import ExecutorWorkload from airflow.executors.workloads.types import WorkloadKey @@ -617,14 +637,7 @@ def run_workload( # Resolve server URL from config when not explicitly provided. # For example, team-specific executors may wish to pass their own server URL. if server is None: - base_url = conf.get("api", "base_url", fallback="/") - if base_url.startswith("/"): - base_url = f"http://localhost:8080{base_url}" - server = conf.get( - "core", - "execution_api_server_url", - fallback=f"{base_url.rstrip('/')}/execution/", - ) + server = get_execution_api_server_url() if isinstance(workload, ExecuteTask): from airflow.sdk.execution_time.supervisor import supervise_task diff --git a/airflow-core/src/airflow/executors/local_executor.py b/airflow-core/src/airflow/executors/local_executor.py index 46c5a3acafb0e..74ff0889b732b 100644 --- a/airflow-core/src/airflow/executors/local_executor.py +++ b/airflow-core/src/airflow/executors/local_executor.py @@ -35,7 +35,7 @@ import structlog -from airflow.executors.base_executor import BaseExecutor +from airflow.executors.base_executor import BaseExecutor, get_execution_api_server_url # add logger to parameter of setproctitle to support logging if sys.platform == "darwin": @@ -50,19 +50,6 @@ from airflow.executors.workloads.types import WorkloadResultType -def _get_execution_api_server_url(team_conf) -> str: - """ - Resolve the execution API server URL from team-specific configuration. - - :param team_conf: Team-specific executor configuration (ExecutorConf or AirflowConfigParser) - """ - base_url = team_conf.get("api", "base_url", fallback="/") - if base_url.startswith("/"): - base_url = f"http://localhost:8080{base_url}" - default_execution_api_server = f"{base_url.rstrip('/')}/execution/" - return team_conf.get("core", "execution_api_server_url", fallback=default_execution_api_server) - - def _get_executor_process_title_prefix(team_name: str | None) -> str: """ Build the process title prefix for LocalExecutor workers. @@ -114,7 +101,7 @@ def _run_worker( try: BaseExecutor.run_workload( workload, - server=_get_execution_api_server_url(team_conf), + server=get_execution_api_server_url(team_conf), proctitle=f"{_get_executor_process_title_prefix(team_conf.team_name)} {workload.display_name}", subprocess_logs_to_stdout=True, ) diff --git a/airflow-core/tests/unit/executors/test_local_executor.py b/airflow-core/tests/unit/executors/test_local_executor.py index 0f3c4deca41d2..43d246b9050ce 100644 --- a/airflow-core/tests/unit/executors/test_local_executor.py +++ b/airflow-core/tests/unit/executors/test_local_executor.py @@ -28,8 +28,8 @@ from airflow._shared.timezones import timezone from airflow.executors import workloads -from airflow.executors.base_executor import BaseExecutor, ExecutorConf -from airflow.executors.local_executor import LocalExecutor, _get_execution_api_server_url +from airflow.executors.base_executor import BaseExecutor, ExecutorConf, get_execution_api_server_url +from airflow.executors.local_executor import LocalExecutor from airflow.executors.workloads.base import BundleInfo from airflow.executors.workloads.callback import CallbackDTO from airflow.executors.workloads.task import TaskInstanceDTO @@ -289,7 +289,7 @@ def test_execution_api_server_url_config(self, mock_run_workload, conf_values, e with conf_vars(conf_values): team_conf = ExecutorConf(team_name=None) - BaseExecutor.run_workload(_make_task_workload(), server=_get_execution_api_server_url(team_conf)) + BaseExecutor.run_workload(_make_task_workload(), server=get_execution_api_server_url(team_conf)) mock_run_workload.assert_called_once() assert mock_run_workload.call_args.kwargs["server"] == expected_server @@ -318,7 +318,7 @@ def test_team_and_global_config_isolation(self, mock_run_workload): # Test team-specific config team_conf = ExecutorConf(team_name=team_name) BaseExecutor.run_workload( - _make_task_workload(), server=_get_execution_api_server_url(team_conf) + _make_task_workload(), server=get_execution_api_server_url(team_conf) ) # Verify team-specific server URL was used @@ -330,7 +330,7 @@ def test_team_and_global_config_isolation(self, mock_run_workload): # Test global config (no team) global_conf = ExecutorConf(team_name=None) BaseExecutor.run_workload( - _make_task_workload(), server=_get_execution_api_server_url(global_conf) + _make_task_workload(), server=get_execution_api_server_url(global_conf) ) # Verify default server URL was used diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py index 5ff62866e8c8d..54da9c22e2fbc 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py @@ -42,7 +42,7 @@ from celery.signals import import_modules as celery_import_modules, worker_ready from sqlalchemy import select -from airflow.executors.base_executor import BaseExecutor +from airflow.executors.base_executor import BaseExecutor, get_execution_api_server_url from airflow.providers.celery.version_compat import ( AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_9_PLUS, @@ -250,12 +250,6 @@ def _execute_workload_pre_3_2(input: str) -> None: log.info("[%s] Executing workload in Celery: %s", celery_task_id, workload) - base_url = conf.get("api", "base_url", fallback="/") - # If it's a relative URL, use localhost:8080 as the default. - if base_url.startswith("/"): - base_url = f"http://localhost:8080{base_url}" - default_execution_api_server = f"{base_url.rstrip('/')}/execution/" - try: if isinstance(workload, workloads.ExecuteTask): supervise( @@ -264,7 +258,7 @@ def _execute_workload_pre_3_2(input: str) -> None: dag_rel_path=workload.dag_rel_path, bundle_info=workload.bundle_info, token=workload.token, - server=conf.get("core", "execution_api_server_url", fallback=default_execution_api_server), + server=get_execution_api_server_url(), log_path=workload.log_path, ) else: From cdf861d305ec895d96199276580500376ff90e13 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Tue, 14 Apr 2026 20:23:39 -0700 Subject: [PATCH 2/3] compat fix? --- .../celery/executors/celery_executor_utils.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py index 54da9c22e2fbc..499f1fcd1aafb 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py @@ -42,7 +42,7 @@ from celery.signals import import_modules as celery_import_modules, worker_ready from sqlalchemy import select -from airflow.executors.base_executor import BaseExecutor, get_execution_api_server_url +from airflow.executors.base_executor import BaseExecutor from airflow.providers.celery.version_compat import ( AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_9_PLUS, @@ -250,6 +250,18 @@ def _execute_workload_pre_3_2(input: str) -> None: log.info("[%s] Executing workload in Celery: %s", celery_task_id, workload) + # TODO: Remove the inline resolution once the celery provider's minimum airflow version is >= 3.2.1 + if AIRFLOW_V_3_2_PLUS: + from airflow.executors.base_executor import get_execution_api_server_url + + server = get_execution_api_server_url() + else: + base_url = conf.get("api", "base_url", fallback="/") + if base_url.startswith("/"): + base_url = f"http://localhost:8080{base_url}" + default_execution_api_server = f"{base_url.rstrip('/')}/execution/" + server = conf.get("core", "execution_api_server_url", fallback=default_execution_api_server) + try: if isinstance(workload, workloads.ExecuteTask): supervise( @@ -258,7 +270,7 @@ def _execute_workload_pre_3_2(input: str) -> None: dag_rel_path=workload.dag_rel_path, bundle_info=workload.bundle_info, token=workload.token, - server=get_execution_api_server_url(), + server=server, log_path=workload.log_path, ) else: From 412ecc9e63afe6622dd81dca251765dda8e3b2b0 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 15 Apr 2026 17:13:43 -0700 Subject: [PATCH 3/3] pr fixes --- .../src/airflow/executors/base_executor.py | 11 ++++++----- .../celery/executors/celery_executor_utils.py | 18 ++++++------------ 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/airflow-core/src/airflow/executors/base_executor.py b/airflow-core/src/airflow/executors/base_executor.py index 59804b8bcda41..75ce7c9f5dfa4 100644 --- a/airflow-core/src/airflow/executors/base_executor.py +++ b/airflow-core/src/airflow/executors/base_executor.py @@ -24,7 +24,7 @@ from collections.abc import Sequence from dataclasses import dataclass, field from functools import cached_property -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast import pendulum @@ -53,13 +53,14 @@ def get_execution_api_server_url(conf_source: AirflowConfigParser | ExecutorConf a team-specific URL. """ base_url = conf_source.get("api", "base_url", fallback="/") - - # Both .get() statements have a fallback= which guarantee a str, mypy sees - # .get()'s return is typed as str | None so we have to add redundant checks + # ExecutorConf.get() is typed as str | None even when fallback= guarantees a str, + # so the `not base_url` guard and the cast() below keep mypy happy. if not base_url or base_url.startswith("/"): base_url = f"http://localhost:8080{base_url}" default_execution_api_server = f"{base_url.rstrip('/')}/execution/" - return conf_source.get("core", "execution_api_server_url", fallback=default_execution_api_server) # type: ignore[return-value] + return cast( + "str", conf_source.get("core", "execution_api_server_url", fallback=default_execution_api_server) + ) if TYPE_CHECKING: diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py index 499f1fcd1aafb..5ff62866e8c8d 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py @@ -250,17 +250,11 @@ def _execute_workload_pre_3_2(input: str) -> None: log.info("[%s] Executing workload in Celery: %s", celery_task_id, workload) - # TODO: Remove the inline resolution once the celery provider's minimum airflow version is >= 3.2.1 - if AIRFLOW_V_3_2_PLUS: - from airflow.executors.base_executor import get_execution_api_server_url - - server = get_execution_api_server_url() - else: - base_url = conf.get("api", "base_url", fallback="/") - if base_url.startswith("/"): - base_url = f"http://localhost:8080{base_url}" - default_execution_api_server = f"{base_url.rstrip('/')}/execution/" - server = conf.get("core", "execution_api_server_url", fallback=default_execution_api_server) + base_url = conf.get("api", "base_url", fallback="/") + # If it's a relative URL, use localhost:8080 as the default. + if base_url.startswith("/"): + base_url = f"http://localhost:8080{base_url}" + default_execution_api_server = f"{base_url.rstrip('/')}/execution/" try: if isinstance(workload, workloads.ExecuteTask): @@ -270,7 +264,7 @@ def _execute_workload_pre_3_2(input: str) -> None: dag_rel_path=workload.dag_rel_path, bundle_info=workload.bundle_info, token=workload.token, - server=server, + server=conf.get("core", "execution_api_server_url", fallback=default_execution_api_server), log_path=workload.log_path, ) else: