From a954646ea4b200cf65762889ebdd876be8dcd142 Mon Sep 17 00:00:00 2001 From: Matt711 Date: Wed, 4 May 2022 17:39:23 -0400 Subject: [PATCH 01/15] Add HTTP API to scheduler --- distributed/distributed.yaml | 1 + distributed/http/scheduler/api.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 distributed/http/scheduler/api.py diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 19baf8343b0..8f1aaf59922 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -52,6 +52,7 @@ distributed: - distributed.http.scheduler.prometheus - distributed.http.scheduler.info - distributed.http.scheduler.json + - distributed.http.scheduler.api - distributed.http.health - distributed.http.proxy - distributed.http.statics diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py new file mode 100644 index 00000000000..c3c4e0e1aaf --- /dev/null +++ b/distributed/http/scheduler/api.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import json +from tornado import web + + +class APIHandler(web.RequestHandler): + def get(self): + self.write("API V1") + self.set_header("Content-Type", "text/plain") + + +class RetireWorkersHandler(web.RequestHandler): + def initialize(self, dask_server): + self.dask_server = dask_server + + async def post(self): + self.set_header("Content-Type", "text/json") + try: + params = json.loads(self.request.body) + workers_info = await self.dask_server.retire_workers(**params) + self.write(json.dumps(workers_info)) + except Exception as e: + self.set_status(400, str(e)) + self.write(json.dumps({"Error": "Bad request"})) + + +routes: list[tuple] = [ + ("/api/v1", APIHandler, {}), + ("/api/v1/retire_workers", RetireWorkersHandler, {}), +] \ No newline at end of file From 181172578c4a20fd14dcea1b1f8e2de3bb3ef161 Mon Sep 17 00:00:00 2001 From: Matt711 Date: Wed, 4 May 2022 18:04:04 -0400 Subject: [PATCH 02/15] Run pre-commit checks --- distributed/http/scheduler/api.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index c3c4e0e1aaf..f7e8b578147 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -1,6 +1,7 @@ from __future__ import annotations import json + from tornado import web @@ -28,4 +29,4 @@ async def post(self): routes: list[tuple] = [ ("/api/v1", APIHandler, {}), ("/api/v1/retire_workers", RetireWorkersHandler, {}), -] \ No newline at end of file +] From b27770ac7a2830e02774764723ef8159c7b19983 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 10 May 2022 12:46:19 -0400 Subject: [PATCH 03/15] Add adaptive_target and get_workers --- distributed/http/scheduler/api.py | 32 ++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index f7e8b578147..04b7a7abfa0 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -2,31 +2,49 @@ import json -from tornado import web +from distributed.http.utils import RequestHandler -class APIHandler(web.RequestHandler): +class APIHandler(RequestHandler): def get(self): self.write("API V1") self.set_header("Content-Type", "text/plain") -class RetireWorkersHandler(web.RequestHandler): - def initialize(self, dask_server): - self.dask_server = dask_server - +class RetireWorkersHandler(RequestHandler): async def post(self): self.set_header("Content-Type", "text/json") + scheduler = self.server try: params = json.loads(self.request.body) - workers_info = await self.dask_server.retire_workers(**params) + self.write(params) + workers_info = await scheduler.retire_workers(**params) self.write(json.dumps(workers_info)) except Exception as e: self.set_status(400, str(e)) self.write(json.dumps({"Error": "Bad request"})) +class GetWorkersHandler(RequestHandler): + def get(self): + scheduler = self.server + response = { + "num_workers": len(scheduler.workers), + "workers": [{"name": ws.name, "address": ws.address} for ws in scheduler.workers.values()] + } + self.write(response) + +class AdaptiveTargetHandler(RequestHandler): + def get(self): + scheduler = self.server + desired_workers = scheduler.adaptive_target() + response = { + "desired_workers": desired_workers, + } + self.write(response) routes: list[tuple] = [ ("/api/v1", APIHandler, {}), ("/api/v1/retire_workers", RetireWorkersHandler, {}), + ("/api/v1/get_workers", GetWorkersHandler, {}), + ("/api/v1/adaptive_target", AdaptiveTargetHandler, {}), ] From b5b26d2c247cf9ebfbfb60f83da49bf28801eeba Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 10 May 2022 13:17:16 -0400 Subject: [PATCH 04/15] Add tests --- distributed/http/scheduler/api.py | 8 ++++- .../scheduler/tests/test_scheduler_http.py | 29 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index 04b7a7abfa0..5441e07a6a7 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -24,15 +24,20 @@ async def post(self): self.set_status(400, str(e)) self.write(json.dumps({"Error": "Bad request"})) + class GetWorkersHandler(RequestHandler): def get(self): scheduler = self.server response = { "num_workers": len(scheduler.workers), - "workers": [{"name": ws.name, "address": ws.address} for ws in scheduler.workers.values()] + "workers": [ + {"name": ws.name, "address": ws.address} + for ws in scheduler.workers.values() + ], } self.write(response) + class AdaptiveTargetHandler(RequestHandler): def get(self): scheduler = self.server @@ -42,6 +47,7 @@ def get(self): } self.write(response) + routes: list[tuple] = [ ("/api/v1", APIHandler, {}), ("/api/v1/retire_workers", RetireWorkersHandler, {}), diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index dce785fdce5..a2e57c00871 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -245,3 +245,32 @@ async def test_eventstream(c, s, a, b): ) assert "websocket" in str(s.plugins).lower() ws_client.close() + + +@gen_cluster(client=True, clean_kwargs={"threads": False}) +async def test_api(c, s, a, b): + http_client = AsyncHTTPClient() + + response = await http_client.fetch( + "http://localhost:%d/api/v1" % s.http_server.port + ) + assert response.code == 200 + assert response.headers["Content-Type"] == "text/plain" + + txt = response.body.decode("utf8") + assert txt == "API V1" + + +@gen_cluster(client=True, clean_kwargs={"threads": False}) +async def test_retire_workers(c, s, a, b): + assert True + + +@gen_cluster(client=True, clean_kwargs={"threads": False}) +async def test_get_workers(c, s, a, b): + assert True + + +@gen_cluster(client=True, clean_kwargs={"threads": False}) +async def test_adaptive_target(c, s, a, b): + assert True From dee5c8ebfa6369297b4f63cd685291209b2adafb Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 12 May 2022 11:28:44 -0400 Subject: [PATCH 05/15] Add tests for api endpoints --- .../scheduler/tests/test_scheduler_http.py | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index a2e57c00871..66e8db37f59 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -2,6 +2,7 @@ import json import re +import aiohttp import pytest pytest.importorskip("bokeh") @@ -263,14 +264,33 @@ async def test_api(c, s, a, b): @gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_retire_workers(c, s, a, b): - assert True + async with aiohttp.ClientSession() as session: + params = {"workers": [a.address, b.address]} + async with session.post( + "http://localhost:%d/api/v1/retire_workers" % s.http_server.port, + json=params, + ) as resp: + assert resp.status == 200 + assert len(c.scheduler_info()["workers"]) == 0 @gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_get_workers(c, s, a, b): - assert True + async with aiohttp.ClientSession() as session: + async with session.get( + "http://localhost:%d/api/v1/get_workers" % s.http_server.port + ) as resp: + assert resp.status == 200 + workers_info = json.loads(await resp.text())["workers"] + workers_address = [worker.get("address") for worker in workers_info] + assert set(workers_address) == {a.address, b.address} @gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_adaptive_target(c, s, a, b): - assert True + async with aiohttp.ClientSession() as session: + async with session.get( + "http://localhost:%d/api/v1/adaptive_target" % s.http_server.port + ) as resp: + assert resp.status == 200 + assert int(await resp.text()) == 0 From 015501d209ef77265a547344eb3f255e5d16ca9a Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 12 May 2022 13:31:46 -0400 Subject: [PATCH 06/15] Fix tests adaptive_target and retire_workers --- distributed/http/scheduler/api.py | 1 - distributed/http/scheduler/tests/test_scheduler_http.py | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index 5441e07a6a7..1203b310934 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -17,7 +17,6 @@ async def post(self): scheduler = self.server try: params = json.loads(self.request.body) - self.write(params) workers_info = await scheduler.retire_workers(**params) self.write(json.dumps(workers_info)) except Exception as e: diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index 66e8db37f59..571b7add4f3 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -271,7 +271,8 @@ async def test_retire_workers(c, s, a, b): json=params, ) as resp: assert resp.status == 200 - assert len(c.scheduler_info()["workers"]) == 0 + await resp.text() + assert len(c.scheduler_info()["workers"]) == 0 @gen_cluster(client=True, clean_kwargs={"threads": False}) @@ -293,4 +294,5 @@ async def test_adaptive_target(c, s, a, b): "http://localhost:%d/api/v1/adaptive_target" % s.http_server.port ) as resp: assert resp.status == 200 - assert int(await resp.text()) == 0 + num_workers = json.loads(await resp.text())["workers"] + assert num_workers == 0 From 785f33b75c5ddfeaad879aaa7b1452e49a54261b Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 12 May 2022 14:15:55 -0400 Subject: [PATCH 07/15] Change key in adaptive_target response and remove --- distributed/http/scheduler/api.py | 2 +- distributed/http/scheduler/tests/test_scheduler_http.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index 1203b310934..e61a42e98a8 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -42,7 +42,7 @@ def get(self): scheduler = self.server desired_workers = scheduler.adaptive_target() response = { - "desired_workers": desired_workers, + "workers": desired_workers, } self.write(response) diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index 571b7add4f3..9d7622cdf54 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -271,8 +271,6 @@ async def test_retire_workers(c, s, a, b): json=params, ) as resp: assert resp.status == 200 - await resp.text() - assert len(c.scheduler_info()["workers"]) == 0 @gen_cluster(client=True, clean_kwargs={"threads": False}) From a83ff3018994f336b72aef70d4ccf53495ac3d58 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Fri, 13 May 2022 08:28:14 -0400 Subject: [PATCH 08/15] Address comments json, headers, update tests --- distributed/http/scheduler/api.py | 6 +++-- .../scheduler/tests/test_scheduler_http.py | 22 ++++++++++--------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index e61a42e98a8..5027dc13b41 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -26,6 +26,7 @@ async def post(self): class GetWorkersHandler(RequestHandler): def get(self): + self.set_header("Content-Type", "text/json") scheduler = self.server response = { "num_workers": len(scheduler.workers), @@ -34,17 +35,18 @@ def get(self): for ws in scheduler.workers.values() ], } - self.write(response) + self.write(json.dumps(response)) class AdaptiveTargetHandler(RequestHandler): def get(self): + self.set_header("Content-Type", "text/json") scheduler = self.server desired_workers = scheduler.adaptive_target() response = { "workers": desired_workers, } - self.write(response) + self.write(json.dumps(response)) routes: list[tuple] = [ diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index 9d7622cdf54..b1213bc95bd 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -250,16 +250,13 @@ async def test_eventstream(c, s, a, b): @gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_api(c, s, a, b): - http_client = AsyncHTTPClient() - - response = await http_client.fetch( - "http://localhost:%d/api/v1" % s.http_server.port - ) - assert response.code == 200 - assert response.headers["Content-Type"] == "text/plain" - - txt = response.body.decode("utf8") - assert txt == "API V1" + async with aiohttp.ClientSession() as session: + async with session.get( + "http://localhost:%d/api/v1" % s.http_server.port + ) as resp: + assert resp.status == 200 + assert resp.headers["Content-Type"] == "text/plain" + assert (await resp.text()) == "API V1" @gen_cluster(client=True, clean_kwargs={"threads": False}) @@ -271,6 +268,9 @@ async def test_retire_workers(c, s, a, b): json=params, ) as resp: assert resp.status == 200 + assert resp.headers["Content-Type"] == "text/json" + retired_workers_info = json.loads(await resp.text()) + assert len(retired_workers_info) == 2 @gen_cluster(client=True, clean_kwargs={"threads": False}) @@ -280,6 +280,7 @@ async def test_get_workers(c, s, a, b): "http://localhost:%d/api/v1/get_workers" % s.http_server.port ) as resp: assert resp.status == 200 + assert resp.headers["Content-Type"] == "text/json" workers_info = json.loads(await resp.text())["workers"] workers_address = [worker.get("address") for worker in workers_info] assert set(workers_address) == {a.address, b.address} @@ -292,5 +293,6 @@ async def test_adaptive_target(c, s, a, b): "http://localhost:%d/api/v1/adaptive_target" % s.http_server.port ) as resp: assert resp.status == 200 + assert resp.headers["Content-Type"] == "text/json" num_workers = json.loads(await resp.text())["workers"] assert num_workers == 0 From c55dbf39a03033ff61728de2eeb05452b6b81dbe Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Fri, 13 May 2022 08:41:47 -0400 Subject: [PATCH 09/15] Add documentation --- docs/source/http_services.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/source/http_services.rst b/docs/source/http_services.rst index fe076a08653..d3fcf66deb4 100644 --- a/docs/source/http_services.rst +++ b/docs/source/http_services.rst @@ -48,6 +48,9 @@ Pages and JSON endpoints served by the scheduler - ``/sitemap.json``: list of available endpoints - ``/statics/()``: static file content (CSS, etc) - ``/stealing``: worker occupancy metrics, to evaluate task stealing +- ``/api/v1/retire_workers`` : retire certain workers on the scheduler +- ``/api/v1/get_workers`` : get all workers on the scheduler +- ``/api/v1/adaptive_target`` : get the target number of workers based on the scheduler's load Individual bokeh plots ---------------------- From 7927b8a78c79875e91f09cdf57150fa1877a0fd1 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Fri, 13 May 2022 10:17:20 -0400 Subject: [PATCH 10/15] Add workers_to_close method and more documentation --- distributed/http/scheduler/api.py | 15 +++++++++++++++ .../scheduler/tests/test_scheduler_http.py | 14 ++++++++++++++ docs/source/http_services.rst | 19 +++++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index 5027dc13b41..34fc3340d58 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -24,6 +24,20 @@ async def post(self): self.write(json.dumps({"Error": "Bad request"})) +class WorkersToCloseHandler(RequestHandler): + async def post(self): + self.set_header("Content-Type", "text/json") + scheduler = self.server + try: + params = json.loads(self.request.body) + workers_to_close = {"workers": scheduler.workers_to_close(**params)} + self.write(json.dumps(workers_to_close)) + except Exception as e: + self.set_status(400, str(e)) + print(str(e)) + self.write(json.dumps({"Error": "Bad request"})) + + class GetWorkersHandler(RequestHandler): def get(self): self.set_header("Content-Type", "text/json") @@ -52,6 +66,7 @@ def get(self): routes: list[tuple] = [ ("/api/v1", APIHandler, {}), ("/api/v1/retire_workers", RetireWorkersHandler, {}), + ("/api/v1/workers_to_close", WorkersToCloseHandler, {}), ("/api/v1/get_workers", GetWorkersHandler, {}), ("/api/v1/adaptive_target", AdaptiveTargetHandler, {}), ] diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index b1213bc95bd..2c2a9350996 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -273,6 +273,20 @@ async def test_retire_workers(c, s, a, b): assert len(retired_workers_info) == 2 +@gen_cluster(client=True, clean_kwargs={"threads": False}) +async def test_workers_to_close(c, s, a, b): + async with aiohttp.ClientSession() as session: + params = {"n": 2} + async with session.post( + "http://localhost:%d/api/v1/workers_to_close" % s.http_server.port, + json=params, + ) as resp: + assert resp.status == 200 + assert resp.headers["Content-Type"] == "text/json" + workers_to_close = json.loads(await resp.text())["workers"] + assert len(workers_to_close) == 2 + + @gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_get_workers(c, s, a, b): async with aiohttp.ClientSession() as session: diff --git a/docs/source/http_services.rst b/docs/source/http_services.rst index d3fcf66deb4..885a49e3a7b 100644 --- a/docs/source/http_services.rst +++ b/docs/source/http_services.rst @@ -48,7 +48,26 @@ Pages and JSON endpoints served by the scheduler - ``/sitemap.json``: list of available endpoints - ``/statics/()``: static file content (CSS, etc) - ``/stealing``: worker occupancy metrics, to evaluate task stealing + +Scheduler API +------------- + +Scheduler methods exposed by the API with an example of the request body they take + - ``/api/v1/retire_workers`` : retire certain workers on the scheduler + +.. code-block:: json + { + "workers":["tcp://127.0.0.1:53741", "tcp://127.0.0.1:53669"] + } + +- ``/api/v1/workers_to_close`` : get a list of n workers that the scheduler can safely close + +.. code-block:: json + { + "n":2 + } + - ``/api/v1/get_workers`` : get all workers on the scheduler - ``/api/v1/adaptive_target`` : get the target number of workers based on the scheduler's load From ba955023784850baef61e82ba13857248fa8a499 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 13 May 2022 15:23:57 +0100 Subject: [PATCH 11/15] Apply suggestions from code review --- docs/source/http_services.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/http_services.rst b/docs/source/http_services.rst index 885a49e3a7b..fa83f782fa2 100644 --- a/docs/source/http_services.rst +++ b/docs/source/http_services.rst @@ -57,6 +57,7 @@ Scheduler methods exposed by the API with an example of the request body they ta - ``/api/v1/retire_workers`` : retire certain workers on the scheduler .. code-block:: json + { "workers":["tcp://127.0.0.1:53741", "tcp://127.0.0.1:53669"] } @@ -64,6 +65,7 @@ Scheduler methods exposed by the API with an example of the request body they ta - ``/api/v1/workers_to_close`` : get a list of n workers that the scheduler can safely close .. code-block:: json + { "n":2 } From f2c89611c383fc681fa3e87aa6d7ded9906a94fd Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Mon, 16 May 2022 07:48:21 -0400 Subject: [PATCH 12/15] Address comments from fjetter --- distributed/http/scheduler/api.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index 34fc3340d58..efb35cc51f6 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -33,8 +33,7 @@ async def post(self): workers_to_close = {"workers": scheduler.workers_to_close(**params)} self.write(json.dumps(workers_to_close)) except Exception as e: - self.set_status(400, str(e)) - print(str(e)) + self.set_status(500, str(e)) self.write(json.dumps({"Error": "Bad request"})) From 97275b7ee35a026b9d433626bd48dfdf1cb8fbd2 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Mon, 16 May 2022 12:03:23 -0400 Subject: [PATCH 13/15] Address more comments --- distributed/http/scheduler/api.py | 53 +++++++++---------- .../scheduler/tests/test_scheduler_http.py | 14 ----- 2 files changed, 24 insertions(+), 43 deletions(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index efb35cc51f6..9e124521b66 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -17,55 +17,50 @@ async def post(self): scheduler = self.server try: params = json.loads(self.request.body) - workers_info = await scheduler.retire_workers(**params) + workers = params["workers"] + workers_info = await scheduler.retire_workers(workers) self.write(json.dumps(workers_info)) - except Exception as e: - self.set_status(400, str(e)) - self.write(json.dumps({"Error": "Bad request"})) - - -class WorkersToCloseHandler(RequestHandler): - async def post(self): - self.set_header("Content-Type", "text/json") - scheduler = self.server - try: - params = json.loads(self.request.body) - workers_to_close = {"workers": scheduler.workers_to_close(**params)} - self.write(json.dumps(workers_to_close)) except Exception as e: self.set_status(500, str(e)) - self.write(json.dumps({"Error": "Bad request"})) + self.write(json.dumps({"Error": "Internal Server Error"})) class GetWorkersHandler(RequestHandler): def get(self): self.set_header("Content-Type", "text/json") scheduler = self.server - response = { - "num_workers": len(scheduler.workers), - "workers": [ - {"name": ws.name, "address": ws.address} - for ws in scheduler.workers.values() - ], - } - self.write(json.dumps(response)) + try: + response = { + "num_workers": len(scheduler.workers), + "workers": [ + {"name": ws.name, "address": ws.address} + for ws in scheduler.workers.values() + ], + } + self.write(json.dumps(response)) + except Exception as e: + self.set_status(500, str(e)) + self.write(json.dumps({"Error": "Internal Server Error"})) class AdaptiveTargetHandler(RequestHandler): def get(self): self.set_header("Content-Type", "text/json") scheduler = self.server - desired_workers = scheduler.adaptive_target() - response = { - "workers": desired_workers, - } - self.write(json.dumps(response)) + try: + desired_workers = scheduler.adaptive_target() + response = { + "workers": desired_workers, + } + self.write(json.dumps(response)) + except Exception as e: + self.set_status(500, str(e)) + self.write(json.dumps({"Error": "Internal Server Error"})) routes: list[tuple] = [ ("/api/v1", APIHandler, {}), ("/api/v1/retire_workers", RetireWorkersHandler, {}), - ("/api/v1/workers_to_close", WorkersToCloseHandler, {}), ("/api/v1/get_workers", GetWorkersHandler, {}), ("/api/v1/adaptive_target", AdaptiveTargetHandler, {}), ] diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index 2c2a9350996..b1213bc95bd 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -273,20 +273,6 @@ async def test_retire_workers(c, s, a, b): assert len(retired_workers_info) == 2 -@gen_cluster(client=True, clean_kwargs={"threads": False}) -async def test_workers_to_close(c, s, a, b): - async with aiohttp.ClientSession() as session: - params = {"n": 2} - async with session.post( - "http://localhost:%d/api/v1/workers_to_close" % s.http_server.port, - json=params, - ) as resp: - assert resp.status == 200 - assert resp.headers["Content-Type"] == "text/json" - workers_to_close = json.loads(await resp.text())["workers"] - assert len(workers_to_close) == 2 - - @gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_get_workers(c, s, a, b): async with aiohttp.ClientSession() as session: From 65c622f371948bff4347c66dec61208d93ce6198 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Mon, 16 May 2022 12:22:08 -0400 Subject: [PATCH 14/15] Remove workers_to_close from doc --- docs/source/http_services.rst | 8 -------- 1 file changed, 8 deletions(-) diff --git a/docs/source/http_services.rst b/docs/source/http_services.rst index fa83f782fa2..31bb62292a7 100644 --- a/docs/source/http_services.rst +++ b/docs/source/http_services.rst @@ -62,14 +62,6 @@ Scheduler methods exposed by the API with an example of the request body they ta "workers":["tcp://127.0.0.1:53741", "tcp://127.0.0.1:53669"] } -- ``/api/v1/workers_to_close`` : get a list of n workers that the scheduler can safely close - -.. code-block:: json - - { - "n":2 - } - - ``/api/v1/get_workers`` : get all workers on the scheduler - ``/api/v1/adaptive_target`` : get the target number of workers based on the scheduler's load From 15ad9923b57fea51ccd33d7514bef6c337765d06 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Mon, 16 May 2022 13:34:17 -0400 Subject: [PATCH 15/15] Add support for closing n workers --- distributed/http/scheduler/api.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index 9e124521b66..3f4e80194fe 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -17,8 +17,13 @@ async def post(self): scheduler = self.server try: params = json.loads(self.request.body) - workers = params["workers"] - workers_info = await scheduler.retire_workers(workers) + n_workers = params.get("n", 0) + if n_workers: + workers = scheduler.workers_to_close(n=n_workers) + workers_info = await scheduler.retire_workers(workers=workers) + else: + workers = params.get("workers", {}) + workers_info = await scheduler.retire_workers(workers=workers) self.write(json.dumps(workers_info)) except Exception as e: self.set_status(500, str(e))