From 2ae7a4667d57ccb00884726cd675963030a06ccc Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Mon, 27 Oct 2025 21:04:52 +0100 Subject: [PATCH 1/5] Improve default of execution URL if deployed in subpath --- providers/edge3/docs/deployment.rst | 4 ++-- .../src/airflow/providers/edge3/cli/worker.py | 22 ++++++++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/providers/edge3/docs/deployment.rst b/providers/edge3/docs/deployment.rst index 0cfe2fa4d9895..d7ec135ca14ca 100644 --- a/providers/edge3/docs/deployment.rst +++ b/providers/edge3/docs/deployment.rst @@ -51,8 +51,8 @@ Minimum Airflow configuration settings for the Edge Worker to make it running is - Section ``[core]`` - ``execution_api_server_url``: If not set, the base URL from ``edge.api_url`` will be used. For example, - when ``edge.api_url`` is set to ``https://your-hostname-and-port/edge_worker/v1/rpcapi``, it will - default to ``https://your-hostname-and-port/execution/``. + when ``edge.api_url`` is set to ``https://your-hostname-and-port/subpath/edge_worker/v1/rpcapi``, it will + default to ``https://your-hostname-and-port/subpath/execution/`` (starting from version 3.0.0). - ``executor``: Executor must be set or added to be ``airflow.providers.edge3.executors.EdgeExecutor`` - ``internal_api_secret_key``: An encryption key must be set on api-server and Edge Worker component as shared secret to authenticate traffic. It should be a random string like the fernet key diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index 48aa4c8d578bd..5dd889c59e6a7 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -21,13 +21,13 @@ import signal import sys from datetime import datetime +from functools import cache from http import HTTPStatus from multiprocessing import Process from pathlib import Path from subprocess import Popen from time import sleep from typing import TYPE_CHECKING -from urllib.parse import urlparse from lockfile.pidlockfile import remove_existing_pidfile from requests import HTTPError @@ -175,6 +175,18 @@ def _get_state() -> EdgeWorkerState: return EdgeWorkerState.MAINTENANCE_MODE return EdgeWorkerState.IDLE + @staticmethod + @cache + def _execution_api_server_url() -> str: + """Get the execution api server url from config or environment.""" + api_url = conf.get("edge", "api_url") + execution_api_server_url = conf.get("core", "execution_api_server_url", fallback="") + if not execution_api_server_url and api_url: + # Derive execution api url from edge api url as fallback + execution_api_server_url = api_url.replace("edge_worker/v1/rpcapi", "execution") + logger.info("Using execution api server url: %s", execution_api_server_url) + return execution_api_server_url + @staticmethod def _run_job_via_supervisor(workload) -> int: from airflow.sdk.execution_time.supervisor import supervise @@ -186,12 +198,6 @@ def _run_job_via_supervisor(workload) -> int: setproctitle(f"airflow edge worker: {workload.ti.key}") try: - api_url = conf.get("edge", "api_url") - execution_api_server_url = conf.get("core", "execution_api_server_url", fallback="") - if not execution_api_server_url: - parsed = urlparse(api_url) - execution_api_server_url = f"{parsed.scheme}://{parsed.netloc}/execution/" - supervise( # This is the "wrong" ti type, but it duck types the same. TODO: Create a protocol for this. # Same like in airflow/executors/local_executor.py:_execute_work() @@ -199,7 +205,7 @@ def _run_job_via_supervisor(workload) -> int: dag_rel_path=workload.dag_rel_path, bundle_info=workload.bundle_info, token=workload.token, - server=execution_api_server_url, + server=EdgeWorker._execution_api_server_url(), log_path=workload.log_path, ) return 0 From b6ddf1d160aaffa4310b4067397f2e069d94521a Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Mon, 27 Oct 2025 21:16:21 +0100 Subject: [PATCH 2/5] Improve default of execution URL if deployed in subpath --- providers/edge3/src/airflow/providers/edge3/cli/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index 5dd889c59e6a7..d1e0bcdcf2896 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -188,7 +188,7 @@ def _execution_api_server_url() -> str: return execution_api_server_url @staticmethod - def _run_job_via_supervisor(workload) -> int: + def _run_job_via_supervisor(workload, execution_api_server_url) -> int: from airflow.sdk.execution_time.supervisor import supervise # Ignore ctrl-c in this process -- we don't want to kill _this_ one. we let tasks run to completion @@ -205,7 +205,7 @@ def _run_job_via_supervisor(workload) -> int: dag_rel_path=workload.dag_rel_path, bundle_info=workload.bundle_info, token=workload.token, - server=EdgeWorker._execution_api_server_url(), + server=execution_api_server_url, log_path=workload.log_path, ) return 0 @@ -221,7 +221,7 @@ def _launch_job_af3(edge_job: EdgeJobFetched) -> tuple[Process, Path]: workload: ExecuteTask = edge_job.command process = Process( target=EdgeWorker._run_job_via_supervisor, - kwargs={"workload": workload}, + kwargs={"workload": workload, "execution_api_server_url": EdgeWorker._execution_api_server_url()}, ) process.start() base_log_folder = conf.get("logging", "base_log_folder", fallback="NOT AVAILABLE") From aaf9e2eeb7c18878741cdbd2ff95856bffc8178f Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Mon, 27 Oct 2025 22:03:46 +0100 Subject: [PATCH 3/5] Fix unit tests and add coverage for execution URL function --- .../edge3/tests/unit/edge3/cli/test_worker.py | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py b/providers/edge3/tests/unit/edge3/cli/test_worker.py index 6c0a4aeb80b3c..cc50470b2f331 100644 --- a/providers/edge3/tests/unit/edge3/cli/test_worker.py +++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py @@ -167,12 +167,12 @@ def test_launch_job(self, mock_popen, mock_logfile_path, mock_process, worker_wi "configs, expected_url", [ ( - {("edge", "api_url"): "https://api-endpoint"}, - "https://api-endpoint/execution/", + {("edge", "api_url"): "https://api-host/edge_worker/v1/rpcapi"}, + "https://api-host/execution", ), ( - {("edge", "api_url"): "https://api:1234/endpoint"}, - "https://api:1234/execution/", + {("edge", "api_url"): "https://api:1234/subpath/edge_worker/v1/rpcapi"}, + "https://api:1234/subpath/execution", ), ( { @@ -183,16 +183,24 @@ def test_launch_job(self, mock_popen, mock_logfile_path, mock_process, worker_wi ), ], ) + def test_execution_api_server_url( + self, + configs, + expected_url, + ): + with conf_vars(configs): + EdgeWorker._execution_api_server_url.cache_clear() + url = EdgeWorker._execution_api_server_url() + assert url == expected_url + @patch("airflow.sdk.execution_time.supervisor.supervise") @patch("airflow.providers.edge3.cli.worker.Process") @patch("airflow.providers.edge3.cli.worker.Popen") - def test_use_execution_api_server_url( + def test_supervise_launch( self, mock_popen, mock_process, mock_supervise, - configs, - expected_url, worker_with_job: EdgeWorker, ): mock_popen.side_effect = [MagicMock()] @@ -200,13 +208,12 @@ def test_use_execution_api_server_url( mock_process.side_effect = [mock_process_instance] edge_job = EdgeWorker.jobs.pop().edge_job - with conf_vars(configs): - worker_with_job._launch_job(edge_job) + worker_with_job._launch_job(edge_job) - mock_process_callback = mock_process.call_args.kwargs["target"] - mock_process_callback(workload=MagicMock()) + mock_process_callback = mock_process.call_args.kwargs["target"] + mock_process_callback(workload=MagicMock(), execution_api_server_url="http://mock-url") - assert mock_supervise.call_args.kwargs["server"] == expected_url + assert mock_supervise.call_args.kwargs["server"] == "http://mock-url" @pytest.mark.parametrize( "reserve_result, fetch_result, expected_calls", From 22229dd5b941f2e8656e2f9ec0735f5a4bc20831 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Mon, 27 Oct 2025 22:34:51 +0100 Subject: [PATCH 4/5] Fix unit tests and add coverage for execution URL function, ups need mark as test for AF3 only --- providers/edge3/tests/unit/edge3/cli/test_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py b/providers/edge3/tests/unit/edge3/cli/test_worker.py index cc50470b2f331..427fa3288681f 100644 --- a/providers/edge3/tests/unit/edge3/cli/test_worker.py +++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py @@ -193,6 +193,7 @@ def test_execution_api_server_url( url = EdgeWorker._execution_api_server_url() assert url == expected_url + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow 3+") @patch("airflow.sdk.execution_time.supervisor.supervise") @patch("airflow.providers.edge3.cli.worker.Process") @patch("airflow.providers.edge3.cli.worker.Popen") From 5cb4622cc5481b48b5aa13b2ecf844211b815a5b Mon Sep 17 00:00:00 2001 From: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> Date: Tue, 28 Oct 2025 13:15:46 +0100 Subject: [PATCH 5/5] Apply suggestion from @amoghrajesh Co-authored-by: Amogh Desai --- providers/edge3/docs/deployment.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/edge3/docs/deployment.rst b/providers/edge3/docs/deployment.rst index d7ec135ca14ca..d0387c41946ed 100644 --- a/providers/edge3/docs/deployment.rst +++ b/providers/edge3/docs/deployment.rst @@ -52,7 +52,7 @@ Minimum Airflow configuration settings for the Edge Worker to make it running is - ``execution_api_server_url``: If not set, the base URL from ``edge.api_url`` will be used. For example, when ``edge.api_url`` is set to ``https://your-hostname-and-port/subpath/edge_worker/v1/rpcapi``, it will - default to ``https://your-hostname-and-port/subpath/execution/`` (starting from version 3.0.0). + default to ``https://your-hostname-and-port/subpath/execution/`` (starting from version Airflow version 3.0.0). - ``executor``: Executor must be set or added to be ``airflow.providers.edge3.executors.EdgeExecutor`` - ``internal_api_secret_key``: An encryption key must be set on api-server and Edge Worker component as shared secret to authenticate traffic. It should be a random string like the fernet key