diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 74d59addb35..931c7438ad2 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -53,6 +53,7 @@ distributed: - distributed.http.scheduler.prometheus - distributed.http.scheduler.info - distributed.http.scheduler.json + - distributed.http.scheduler.api - distributed.http.health - distributed.http.proxy - distributed.http.statics diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py new file mode 100644 index 00000000000..3f4e80194fe --- /dev/null +++ b/distributed/http/scheduler/api.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import json + +from distributed.http.utils import RequestHandler + + +class APIHandler(RequestHandler): + def get(self): + self.write("API V1") + self.set_header("Content-Type", "text/plain") + + +class RetireWorkersHandler(RequestHandler): + async def post(self): + self.set_header("Content-Type", "text/json") + scheduler = self.server + try: + params = json.loads(self.request.body) + n_workers = params.get("n", 0) + if n_workers: + workers = scheduler.workers_to_close(n=n_workers) + workers_info = await scheduler.retire_workers(workers=workers) + else: + workers = params.get("workers", {}) + workers_info = await scheduler.retire_workers(workers=workers) + self.write(json.dumps(workers_info)) + except Exception as e: + self.set_status(500, str(e)) + self.write(json.dumps({"Error": "Internal Server Error"})) + + +class GetWorkersHandler(RequestHandler): + def get(self): + self.set_header("Content-Type", "text/json") + scheduler = self.server + try: + response = { + "num_workers": len(scheduler.workers), + "workers": [ + {"name": ws.name, "address": ws.address} + for ws in scheduler.workers.values() + ], + } + self.write(json.dumps(response)) + except Exception as e: + self.set_status(500, str(e)) + self.write(json.dumps({"Error": "Internal Server Error"})) + + +class AdaptiveTargetHandler(RequestHandler): + def get(self): + self.set_header("Content-Type", "text/json") + scheduler = self.server + try: + desired_workers = scheduler.adaptive_target() + response = { + "workers": desired_workers, + } + self.write(json.dumps(response)) + except Exception as e: + self.set_status(500, str(e)) + self.write(json.dumps({"Error": "Internal Server Error"})) + + +routes: list[tuple] = [ + ("/api/v1", APIHandler, {}), + ("/api/v1/retire_workers", RetireWorkersHandler, {}), + ("/api/v1/get_workers", GetWorkersHandler, {}), + ("/api/v1/adaptive_target", AdaptiveTargetHandler, {}), +] diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index dce785fdce5..b1213bc95bd 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -2,6 +2,7 @@ import json import re +import aiohttp import pytest pytest.importorskip("bokeh") @@ -245,3 +246,53 @@ async def test_eventstream(c, s, a, b): ) assert "websocket" in str(s.plugins).lower() ws_client.close() + + +@gen_cluster(client=True, clean_kwargs={"threads": False}) +async def test_api(c, s, a, b): + async with aiohttp.ClientSession() as session: + async with session.get( + "http://localhost:%d/api/v1" % s.http_server.port + ) as resp: + assert resp.status == 200 + assert resp.headers["Content-Type"] == "text/plain" + assert (await resp.text()) == "API V1" + + +@gen_cluster(client=True, clean_kwargs={"threads": False}) +async def test_retire_workers(c, s, a, b): + async with aiohttp.ClientSession() as session: + params = {"workers": [a.address, b.address]} + async with session.post( + "http://localhost:%d/api/v1/retire_workers" % s.http_server.port, + json=params, + ) as resp: + assert resp.status == 200 + assert resp.headers["Content-Type"] == "text/json" + retired_workers_info = json.loads(await resp.text()) + assert len(retired_workers_info) == 2 + + +@gen_cluster(client=True, clean_kwargs={"threads": False}) +async def test_get_workers(c, s, a, b): + async with aiohttp.ClientSession() as session: + async with session.get( + "http://localhost:%d/api/v1/get_workers" % s.http_server.port + ) as resp: + assert resp.status == 200 + assert resp.headers["Content-Type"] == "text/json" + workers_info = json.loads(await resp.text())["workers"] + workers_address = [worker.get("address") for worker in workers_info] + assert set(workers_address) == {a.address, b.address} + + +@gen_cluster(client=True, clean_kwargs={"threads": False}) +async def test_adaptive_target(c, s, a, b): + async with aiohttp.ClientSession() as session: + async with session.get( + "http://localhost:%d/api/v1/adaptive_target" % s.http_server.port + ) as resp: + assert resp.status == 200 + assert resp.headers["Content-Type"] == "text/json" + num_workers = json.loads(await resp.text())["workers"] + assert num_workers == 0 diff --git a/docs/source/http_services.rst b/docs/source/http_services.rst index fe076a08653..31bb62292a7 100644 --- a/docs/source/http_services.rst +++ b/docs/source/http_services.rst @@ -49,6 +49,22 @@ Pages and JSON endpoints served by the scheduler - ``/statics/()``: static file content (CSS, etc) - ``/stealing``: worker occupancy metrics, to evaluate task stealing +Scheduler API +------------- + +Scheduler methods exposed by the API with an example of the request body they take + +- ``/api/v1/retire_workers`` : retire certain workers on the scheduler + +.. code-block:: json + + { + "workers":["tcp://127.0.0.1:53741", "tcp://127.0.0.1:53669"] + } + +- ``/api/v1/get_workers`` : get all workers on the scheduler +- ``/api/v1/adaptive_target`` : get the target number of workers based on the scheduler's load + Individual bokeh plots ----------------------