diff --git a/providers/edge3/docs/deployment.rst b/providers/edge3/docs/deployment.rst index 0cfe2fa4d9895..d0387c41946ed 100644 --- a/providers/edge3/docs/deployment.rst +++ b/providers/edge3/docs/deployment.rst @@ -51,8 +51,8 @@ Minimum Airflow configuration settings for the Edge Worker to make it running is - Section ``[core]`` - ``execution_api_server_url``: If not set, the base URL from ``edge.api_url`` will be used. For example, - when ``edge.api_url`` is set to ``https://your-hostname-and-port/edge_worker/v1/rpcapi``, it will - default to ``https://your-hostname-and-port/execution/``. + when ``edge.api_url`` is set to ``https://your-hostname-and-port/subpath/edge_worker/v1/rpcapi``, it will + default to ``https://your-hostname-and-port/subpath/execution/`` (starting from version Airflow version 3.0.0). - ``executor``: Executor must be set or added to be ``airflow.providers.edge3.executors.EdgeExecutor`` - ``internal_api_secret_key``: An encryption key must be set on api-server and Edge Worker component as shared secret to authenticate traffic. It should be a random string like the fernet key diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index 48aa4c8d578bd..d1e0bcdcf2896 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -21,13 +21,13 @@ import signal import sys from datetime import datetime +from functools import cache from http import HTTPStatus from multiprocessing import Process from pathlib import Path from subprocess import Popen from time import sleep from typing import TYPE_CHECKING -from urllib.parse import urlparse from lockfile.pidlockfile import remove_existing_pidfile from requests import HTTPError @@ -176,7 +176,19 @@ def _get_state() -> EdgeWorkerState: return EdgeWorkerState.IDLE @staticmethod - def _run_job_via_supervisor(workload) -> int: + @cache + def _execution_api_server_url() -> str: + """Get the execution api server url from config or environment.""" + api_url = conf.get("edge", "api_url") + execution_api_server_url = conf.get("core", "execution_api_server_url", fallback="") + if not execution_api_server_url and api_url: + # Derive execution api url from edge api url as fallback + execution_api_server_url = api_url.replace("edge_worker/v1/rpcapi", "execution") + logger.info("Using execution api server url: %s", execution_api_server_url) + return execution_api_server_url + + @staticmethod + def _run_job_via_supervisor(workload, execution_api_server_url) -> int: from airflow.sdk.execution_time.supervisor import supervise # Ignore ctrl-c in this process -- we don't want to kill _this_ one. we let tasks run to completion @@ -186,12 +198,6 @@ def _run_job_via_supervisor(workload) -> int: setproctitle(f"airflow edge worker: {workload.ti.key}") try: - api_url = conf.get("edge", "api_url") - execution_api_server_url = conf.get("core", "execution_api_server_url", fallback="") - if not execution_api_server_url: - parsed = urlparse(api_url) - execution_api_server_url = f"{parsed.scheme}://{parsed.netloc}/execution/" - supervise( # This is the "wrong" ti type, but it duck types the same. TODO: Create a protocol for this. # Same like in airflow/executors/local_executor.py:_execute_work() @@ -215,7 +221,7 @@ def _launch_job_af3(edge_job: EdgeJobFetched) -> tuple[Process, Path]: workload: ExecuteTask = edge_job.command process = Process( target=EdgeWorker._run_job_via_supervisor, - kwargs={"workload": workload}, + kwargs={"workload": workload, "execution_api_server_url": EdgeWorker._execution_api_server_url()}, ) process.start() base_log_folder = conf.get("logging", "base_log_folder", fallback="NOT AVAILABLE") diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py b/providers/edge3/tests/unit/edge3/cli/test_worker.py index 6c0a4aeb80b3c..427fa3288681f 100644 --- a/providers/edge3/tests/unit/edge3/cli/test_worker.py +++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py @@ -167,12 +167,12 @@ def test_launch_job(self, mock_popen, mock_logfile_path, mock_process, worker_wi "configs, expected_url", [ ( - {("edge", "api_url"): "https://api-endpoint"}, - "https://api-endpoint/execution/", + {("edge", "api_url"): "https://api-host/edge_worker/v1/rpcapi"}, + "https://api-host/execution", ), ( - {("edge", "api_url"): "https://api:1234/endpoint"}, - "https://api:1234/execution/", + {("edge", "api_url"): "https://api:1234/subpath/edge_worker/v1/rpcapi"}, + "https://api:1234/subpath/execution", ), ( { @@ -183,16 +183,25 @@ def test_launch_job(self, mock_popen, mock_logfile_path, mock_process, worker_wi ), ], ) + def test_execution_api_server_url( + self, + configs, + expected_url, + ): + with conf_vars(configs): + EdgeWorker._execution_api_server_url.cache_clear() + url = EdgeWorker._execution_api_server_url() + assert url == expected_url + + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow 3+") @patch("airflow.sdk.execution_time.supervisor.supervise") @patch("airflow.providers.edge3.cli.worker.Process") @patch("airflow.providers.edge3.cli.worker.Popen") - def test_use_execution_api_server_url( + def test_supervise_launch( self, mock_popen, mock_process, mock_supervise, - configs, - expected_url, worker_with_job: EdgeWorker, ): mock_popen.side_effect = [MagicMock()] @@ -200,13 +209,12 @@ def test_use_execution_api_server_url( mock_process.side_effect = [mock_process_instance] edge_job = EdgeWorker.jobs.pop().edge_job - with conf_vars(configs): - worker_with_job._launch_job(edge_job) + worker_with_job._launch_job(edge_job) - mock_process_callback = mock_process.call_args.kwargs["target"] - mock_process_callback(workload=MagicMock()) + mock_process_callback = mock_process.call_args.kwargs["target"] + mock_process_callback(workload=MagicMock(), execution_api_server_url="http://mock-url") - assert mock_supervise.call_args.kwargs["server"] == expected_url + assert mock_supervise.call_args.kwargs["server"] == "http://mock-url" @pytest.mark.parametrize( "reserve_result, fetch_result, expected_calls",