Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions providers/edge3/docs/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ Minimum Airflow configuration settings for the Edge Worker to make it running is
- Section ``[core]``

- ``execution_api_server_url``: If not set, the base URL from ``edge.api_url`` will be used. For example,
when ``edge.api_url`` is set to ``https://your-hostname-and-port/edge_worker/v1/rpcapi``, it will
default to ``https://your-hostname-and-port/execution/``.
when ``edge.api_url`` is set to ``https://your-hostname-and-port/subpath/edge_worker/v1/rpcapi``, it will
default to ``https://your-hostname-and-port/subpath/execution/`` (starting from version Airflow version 3.0.0).
- ``executor``: Executor must be set or added to be ``airflow.providers.edge3.executors.EdgeExecutor``
- ``internal_api_secret_key``: An encryption key must be set on api-server and Edge Worker component as
shared secret to authenticate traffic. It should be a random string like the fernet key
Expand Down
24 changes: 15 additions & 9 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import signal
import sys
from datetime import datetime
from functools import cache
from http import HTTPStatus
from multiprocessing import Process
from pathlib import Path
from subprocess import Popen
from time import sleep
from typing import TYPE_CHECKING
from urllib.parse import urlparse

from lockfile.pidlockfile import remove_existing_pidfile
from requests import HTTPError
Expand Down Expand Up @@ -176,7 +176,19 @@ def _get_state() -> EdgeWorkerState:
return EdgeWorkerState.IDLE

@staticmethod
def _run_job_via_supervisor(workload) -> int:
@cache
def _execution_api_server_url() -> str:
"""Get the execution api server url from config or environment."""
api_url = conf.get("edge", "api_url")
execution_api_server_url = conf.get("core", "execution_api_server_url", fallback="")
if not execution_api_server_url and api_url:
# Derive execution api url from edge api url as fallback
execution_api_server_url = api_url.replace("edge_worker/v1/rpcapi", "execution")
logger.info("Using execution api server url: %s", execution_api_server_url)
Comment thread
jscheffl marked this conversation as resolved.
Dismissed
return execution_api_server_url

@staticmethod
def _run_job_via_supervisor(workload, execution_api_server_url) -> int:
from airflow.sdk.execution_time.supervisor import supervise

# Ignore ctrl-c in this process -- we don't want to kill _this_ one. we let tasks run to completion
Expand All @@ -186,12 +198,6 @@ def _run_job_via_supervisor(workload) -> int:
setproctitle(f"airflow edge worker: {workload.ti.key}")

try:
api_url = conf.get("edge", "api_url")
execution_api_server_url = conf.get("core", "execution_api_server_url", fallback="")
if not execution_api_server_url:
parsed = urlparse(api_url)
execution_api_server_url = f"{parsed.scheme}://{parsed.netloc}/execution/"

supervise(
# This is the "wrong" ti type, but it duck types the same. TODO: Create a protocol for this.
# Same like in airflow/executors/local_executor.py:_execute_work()
Expand All @@ -215,7 +221,7 @@ def _launch_job_af3(edge_job: EdgeJobFetched) -> tuple[Process, Path]:
workload: ExecuteTask = edge_job.command
process = Process(
target=EdgeWorker._run_job_via_supervisor,
kwargs={"workload": workload},
kwargs={"workload": workload, "execution_api_server_url": EdgeWorker._execution_api_server_url()},
)
process.start()
base_log_folder = conf.get("logging", "base_log_folder", fallback="NOT AVAILABLE")
Expand Down
32 changes: 20 additions & 12 deletions providers/edge3/tests/unit/edge3/cli/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ def test_launch_job(self, mock_popen, mock_logfile_path, mock_process, worker_wi
"configs, expected_url",
[
(
{("edge", "api_url"): "https://api-endpoint"},
"https://api-endpoint/execution/",
{("edge", "api_url"): "https://api-host/edge_worker/v1/rpcapi"},
"https://api-host/execution",
),
(
{("edge", "api_url"): "https://api:1234/endpoint"},
"https://api:1234/execution/",
{("edge", "api_url"): "https://api:1234/subpath/edge_worker/v1/rpcapi"},
"https://api:1234/subpath/execution",
),
(
{
Expand All @@ -183,30 +183,38 @@ def test_launch_job(self, mock_popen, mock_logfile_path, mock_process, worker_wi
),
],
)
def test_execution_api_server_url(
self,
configs,
expected_url,
):
with conf_vars(configs):
EdgeWorker._execution_api_server_url.cache_clear()
url = EdgeWorker._execution_api_server_url()
assert url == expected_url

@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow 3+")
@patch("airflow.sdk.execution_time.supervisor.supervise")
@patch("airflow.providers.edge3.cli.worker.Process")
@patch("airflow.providers.edge3.cli.worker.Popen")
def test_use_execution_api_server_url(
def test_supervise_launch(
self,
mock_popen,
mock_process,
mock_supervise,
configs,
expected_url,
worker_with_job: EdgeWorker,
):
mock_popen.side_effect = [MagicMock()]
mock_process_instance = MagicMock()
mock_process.side_effect = [mock_process_instance]

edge_job = EdgeWorker.jobs.pop().edge_job
with conf_vars(configs):
worker_with_job._launch_job(edge_job)
worker_with_job._launch_job(edge_job)

mock_process_callback = mock_process.call_args.kwargs["target"]
mock_process_callback(workload=MagicMock())
mock_process_callback = mock_process.call_args.kwargs["target"]
mock_process_callback(workload=MagicMock(), execution_api_server_url="http://mock-url")

assert mock_supervise.call_args.kwargs["server"] == expected_url
assert mock_supervise.call_args.kwargs["server"] == "http://mock-url"

@pytest.mark.parametrize(
"reserve_result, fetch_result, expected_calls",
Expand Down