From fbcdb572840c188456dbda7bde326d9ae9bd5a6d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 24 Aug 2022 15:27:12 +0200 Subject: [PATCH 01/14] Check for all expected metrics --- .../scheduler/tests/test_semaphore_http.py | 1 + .../http/worker/tests/test_worker_http.py | 46 +++++++++++++++---- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/distributed/http/scheduler/tests/test_semaphore_http.py b/distributed/http/scheduler/tests/test_semaphore_http.py index 93ac888f237..80a07f64092 100644 --- a/distributed/http/scheduler/tests/test_semaphore_http.py +++ b/distributed/http/scheduler/tests/test_semaphore_http.py @@ -17,6 +17,7 @@ async def test_prometheus_collect_task_states(c, s, a, b): async def fetch_metrics(): port = s.http_server.port response = await http_client.fetch(f"http://localhost:{port}/metrics") + assert response.code == 200 txt = response.body.decode("utf8") families = { family.name: family diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index b0fc7c1227c..e767aa1ebdf 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -15,18 +15,46 @@ async def test_prometheus(c, s, a, b): http_client = AsyncHTTPClient() - # request data twice since there once was a case where metrics got registered - # multiple times resulting in prometheus_client errors - for _ in range(2): - response = await http_client.fetch( - "http://localhost:%d/metrics" % a.http_server.port - ) + async def fetch_metrics(): + port = a.http_server.port + response = await http_client.fetch(f"http://localhost:{port}/metrics") assert response.code == 200 assert response.headers["Content-Type"] == "text/plain; version=0.0.4" - txt = response.body.decode("utf8") - families = {familiy.name for familiy in text_string_to_metric_families(txt)} - assert "dask_worker_latency_seconds" in families + families = { + family.name: family + for family in text_string_to_metric_families(txt) + if family.name.startswith("dask_worker_") + } + return families + + active_metrics = await fetch_metrics() + + expected_metrics = { + "dask_worker_tasks", + "dask_worker_concurrent_fetch_requests", + "dask_worker_threads", + "dask_worker_latency_seconds", + } + + try: + import crick # noqa: F401 + except ImportError: + pass + else: + expected_metrics = expected_metrics.union( + { + "dask_worker_tick_duration_median_seconds", + "dask_worker_task_duration_median_seconds", + "dask_worker_transfer_bandwidth_median_bytes", + } + ) + + assert active_metrics.keys() == expected_metrics + + # request data twice since there once was a case where metrics got registered + # multiple times resulting in prometheus_client errors + await fetch_metrics() @gen_cluster(client=True) From d2a437fd097e05206087903dd2fab9af4327c592 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 24 Aug 2022 15:36:28 +0200 Subject: [PATCH 02/14] Check for all expected metrics in scheduler prometheus collector --- .../scheduler/tests/test_scheduler_http.py | 34 +++++++++++++------ .../scheduler/tests/test_semaphore_http.py | 2 ++ 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index c2d75d8206f..8afb53f171a 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -93,23 +93,37 @@ async def test_prometheus(c, s, a, b): http_client = AsyncHTTPClient() - # request data twice since there once was a case where metrics got registered multiple times resulting in - # prometheus_client errors - for _ in range(2): - response = await http_client.fetch( - "http://localhost:%d/metrics" % s.http_server.port - ) + async def fetch_metrics(): + port = s.http_server.port + response = await http_client.fetch(f"http://localhost:{port}/metrics") assert response.code == 200 assert response.headers["Content-Type"] == "text/plain; version=0.0.4" txt = response.body.decode("utf8") families = { - family.name: family for family in text_string_to_metric_families(txt) + family.name: family + for family in text_string_to_metric_families(txt) + if family.name.startswith("dask_scheduler_") } - assert "dask_scheduler_workers" in families + return families + + active_metrics = await fetch_metrics() + + expected_metrics = { + "dask_scheduler_clients", + "dask_scheduler_desired_workers", + "dask_scheduler_workers", + "dask_scheduler_tasks", + "dask_scheduler_tasks_suspicious", + "dask_scheduler_tasks_forgotten", + } - client = families["dask_scheduler_clients"] - assert client.samples[0].value == 1.0 + assert active_metrics.keys() == expected_metrics + assert active_metrics["dask_scheduler_clients"].samples[0].value == 1.0 + + # request data twice since there once was a case where metrics got registered multiple times resulting in + # prometheus_client errors + await fetch_metrics() @gen_cluster(client=True, clean_kwargs={"threads": False}) diff --git a/distributed/http/scheduler/tests/test_semaphore_http.py b/distributed/http/scheduler/tests/test_semaphore_http.py index 80a07f64092..c2cf1a65817 100644 --- a/distributed/http/scheduler/tests/test_semaphore_http.py +++ b/distributed/http/scheduler/tests/test_semaphore_http.py @@ -18,6 +18,8 @@ async def fetch_metrics(): port = s.http_server.port response = await http_client.fetch(f"http://localhost:{port}/metrics") assert response.code == 200 + assert response.headers["Content-Type"] == "text/plain; version=0.0.4" + txt = response.body.decode("utf8") families = { family.name: family From 84d7656ba47a2917c857b9520f6038a6f91c29f7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 24 Aug 2022 15:39:30 +0200 Subject: [PATCH 03/14] Fix test name --- distributed/http/scheduler/tests/test_semaphore_http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/http/scheduler/tests/test_semaphore_http.py b/distributed/http/scheduler/tests/test_semaphore_http.py index c2cf1a65817..d7fffcb257a 100644 --- a/distributed/http/scheduler/tests/test_semaphore_http.py +++ b/distributed/http/scheduler/tests/test_semaphore_http.py @@ -8,7 +8,7 @@ @gen_cluster(client=True, clean_kwargs={"threads": False}) -async def test_prometheus_collect_task_states(c, s, a, b): +async def test_prometheus(c, s, a, b): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families From 0834fc8fa579373c9b23eb2f7cb7b10c87323aa1 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 24 Aug 2022 15:53:02 +0200 Subject: [PATCH 04/14] Test prometheus worker task states --- .../http/worker/tests/test_worker_http.py | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index e767aa1ebdf..3d709d42068 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -1,11 +1,12 @@ from __future__ import annotations +import asyncio import json import pytest from tornado.httpclient import AsyncHTTPClient -from distributed.utils_test import gen_cluster +from distributed.utils_test import gen_cluster, slowinc @gen_cluster(client=True) @@ -57,6 +58,58 @@ async def fetch_metrics(): await fetch_metrics() +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)], clean_kwargs={"threads": False}) +async def test_prometheus_task_states(c, s, a, b): + pytest.importorskip("prometheus_client") + from prometheus_client.parser import text_string_to_metric_families + + http_client = AsyncHTTPClient() + + async def fetch_metrics(): + port = a.http_server.port + response = await http_client.fetch(f"http://localhost:{port}/metrics") + assert response.code == 200 + assert response.headers["Content-Type"] == "text/plain; version=0.0.4" + txt = response.body.decode("utf8") + families = { + family.name: family + for family in text_string_to_metric_families(txt) + if family.name.startswith("dask_worker_") + } + active_metrics = { + sample.labels["state"]: sample.value + for sample in families["dask_worker_tasks"].samples + } + return active_metrics + + expected_metrics = {"stored", "executing", "ready", "waiting"} + assert not a.state.tasks + active_metrics = await fetch_metrics() + assert active_metrics.keys() == expected_metrics + assert sum(active_metrics.values()) == 0.0 + + # submit a task which should show up in the prometheus scraping + future = c.submit(slowinc, 1, delay=0.5) + while future.key not in a.state.tasks: + await asyncio.sleep(0.001) + + active_metrics = await fetch_metrics() + assert active_metrics.keys() == expected_metrics + assert sum(active_metrics.values()) == 1.0 + + res = await c.gather(future) + assert res == 2 + + future.release() + + while future.key in a.state.tasks: + await asyncio.sleep(0.001) + + active_metrics = await fetch_metrics() + assert active_metrics.keys() == expected_metrics + assert sum(active_metrics.values()) == 0.0 + + @gen_cluster(client=True) async def test_health(c, s, a, b): http_client = AsyncHTTPClient() From 981aad0bf28d212515d160e1b3a57c6fa2dc6d23 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 24 Aug 2022 16:27:25 +0200 Subject: [PATCH 05/14] Refactor to event-based test --- distributed/http/worker/tests/test_worker_http.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index 3d709d42068..54ff6d96894 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -6,7 +6,8 @@ import pytest from tornado.httpclient import AsyncHTTPClient -from distributed.utils_test import gen_cluster, slowinc +from distributed import Event +from distributed.utils_test import gen_cluster @gen_cluster(client=True) @@ -58,7 +59,7 @@ async def fetch_metrics(): await fetch_metrics() -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)], clean_kwargs={"threads": False}) +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) async def test_prometheus_task_states(c, s, a, b): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families @@ -88,8 +89,10 @@ async def fetch_metrics(): assert active_metrics.keys() == expected_metrics assert sum(active_metrics.values()) == 0.0 + ev = Event() + # submit a task which should show up in the prometheus scraping - future = c.submit(slowinc, 1, delay=0.5) + future = c.submit(ev.wait) while future.key not in a.state.tasks: await asyncio.sleep(0.001) @@ -97,8 +100,8 @@ async def fetch_metrics(): assert active_metrics.keys() == expected_metrics assert sum(active_metrics.values()) == 1.0 - res = await c.gather(future) - assert res == 2 + await ev.set() + await c.gather(future) future.release() From b4a3aa1bb8a66fafd53b2471f68cb8e93f08167f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 24 Aug 2022 19:07:58 +0200 Subject: [PATCH 06/14] Improve singleton pattern --- distributed/http/worker/prometheus/core.py | 17 ++++++++++------- .../http/worker/tests/test_worker_http.py | 6 +++--- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index f95cf415b9f..561f06b9f89 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -77,19 +77,22 @@ def collect(self): class PrometheusHandler(RequestHandler): - _initialized = False + _collector: WorkerMetricCollector | None = None - def __init__(self, *args, **kwargs): + def __init__(self, *args, dask_server=None, **kwargs): import prometheus_client - super().__init__(*args, **kwargs) + super().__init__(*args, dask_server=dask_server, **kwargs) - if PrometheusHandler._initialized: + if PrometheusHandler._collector: + # Especially during testing, multiple workers are started + # sequentially in the same python process + PrometheusHandler._collector.server = self.server return - prometheus_client.REGISTRY.register(WorkerMetricCollector(self.server)) - - PrometheusHandler._initialized = True + PrometheusHandler._collector = WorkerMetricCollector(self.server) + # Register collector + prometheus_client.REGISTRY.register(PrometheusHandler._collector) def get(self): import prometheus_client diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index 54ff6d96894..285a39a0330 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -10,8 +10,8 @@ from distributed.utils_test import gen_cluster -@gen_cluster(client=True) -async def test_prometheus(c, s, a, b): +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) +async def test_prometheus(c, s, a): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families @@ -60,7 +60,7 @@ async def fetch_metrics(): @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) -async def test_prometheus_task_states(c, s, a, b): +async def test_prometheus_collect_task_states(c, s, a, b): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families From ac6876df650ce3bfaf037eb960b610c9c50bd75d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 24 Aug 2022 21:23:06 +0200 Subject: [PATCH 07/14] Fix test --- distributed/http/worker/tests/test_worker_http.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index 285a39a0330..fde95500f49 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -60,7 +60,7 @@ async def fetch_metrics(): @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) -async def test_prometheus_collect_task_states(c, s, a, b): +async def test_prometheus_collect_task_states(c, s, a): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families @@ -93,7 +93,7 @@ async def fetch_metrics(): # submit a task which should show up in the prometheus scraping future = c.submit(ev.wait) - while future.key not in a.state.tasks: + while not a.state.executing: await asyncio.sleep(0.001) active_metrics = await fetch_metrics() From f79e0034829ecef9006658979936377863533326 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 30 Aug 2022 11:37:03 +0200 Subject: [PATCH 08/14] Update distributed/http/worker/tests/test_worker_http.py Co-authored-by: Gabe Joseph --- distributed/http/worker/tests/test_worker_http.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index fde95500f49..555cb3d2892 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -86,8 +86,7 @@ async def fetch_metrics(): expected_metrics = {"stored", "executing", "ready", "waiting"} assert not a.state.tasks active_metrics = await fetch_metrics() - assert active_metrics.keys() == expected_metrics - assert sum(active_metrics.values()) == 0.0 + assert active_metrics == {"stored": 0.0, ""executing": 0.0, "ready": 0.0, "waiting": 0.0} ev = Event() From a6dd39a182b30493a962b9171e88e24411690e86 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 30 Aug 2022 11:37:11 +0200 Subject: [PATCH 09/14] Update distributed/http/worker/tests/test_worker_http.py Co-authored-by: Gabe Joseph --- distributed/http/worker/tests/test_worker_http.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index 555cb3d2892..66ceff88a96 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -96,8 +96,7 @@ async def fetch_metrics(): await asyncio.sleep(0.001) active_metrics = await fetch_metrics() - assert active_metrics.keys() == expected_metrics - assert sum(active_metrics.values()) == 1.0 + assert active_metrics == {"stored": 0.0, ""executing": 1.0, "ready": 0.0, "waiting": 0.0} await ev.set() await c.gather(future) From c034689478b5e831e3a6b672338f346f50ce25cb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 30 Aug 2022 11:37:17 +0200 Subject: [PATCH 10/14] Update distributed/http/worker/tests/test_worker_http.py Co-authored-by: Gabe Joseph --- distributed/http/worker/tests/test_worker_http.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index 66ceff88a96..98404d30b3f 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -107,8 +107,7 @@ async def fetch_metrics(): await asyncio.sleep(0.001) active_metrics = await fetch_metrics() - assert active_metrics.keys() == expected_metrics - assert sum(active_metrics.values()) == 0.0 + assert active_metrics == {"stored": 0.0, ""executing": 0.0, "ready": 0.0, "waiting": 0.0} @gen_cluster(client=True) From c20f075f796aaa6b15d9d98577bd801b28cf6518 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 30 Aug 2022 11:40:50 +0200 Subject: [PATCH 11/14] Update distributed/http/worker/prometheus/core.py Co-authored-by: Gabe Joseph --- distributed/http/worker/prometheus/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index 561f06b9f89..bcb788a2780 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -77,7 +77,7 @@ def collect(self): class PrometheusHandler(RequestHandler): - _collector: WorkerMetricCollector | None = None + _collector: ClassVar[WorkerMetricCollector | None] = None def __init__(self, *args, dask_server=None, **kwargs): import prometheus_client From 35a970029d5511869a9033ffa86f3e106ac9fc3f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 30 Aug 2022 12:33:41 +0200 Subject: [PATCH 12/14] Fix typos --- .../http/worker/tests/test_worker_http.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index 98404d30b3f..64bdfac8e16 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -86,7 +86,12 @@ async def fetch_metrics(): expected_metrics = {"stored", "executing", "ready", "waiting"} assert not a.state.tasks active_metrics = await fetch_metrics() - assert active_metrics == {"stored": 0.0, ""executing": 0.0, "ready": 0.0, "waiting": 0.0} + assert active_metrics == { + "stored": 0.0, + "executing": 0.0, + "ready": 0.0, + "waiting": 0.0, + } ev = Event() @@ -96,7 +101,12 @@ async def fetch_metrics(): await asyncio.sleep(0.001) active_metrics = await fetch_metrics() - assert active_metrics == {"stored": 0.0, ""executing": 1.0, "ready": 0.0, "waiting": 0.0} + assert active_metrics == { + "stored": 0.0, + "executing": 1.0, + "ready": 0.0, + "waiting": 0.0, + } await ev.set() await c.gather(future) @@ -107,7 +117,12 @@ async def fetch_metrics(): await asyncio.sleep(0.001) active_metrics = await fetch_metrics() - assert active_metrics == {"stored": 0.0, ""executing": 0.0, "ready": 0.0, "waiting": 0.0} + assert active_metrics == { + "stored": 0.0, + "executing": 0.0, + "ready": 0.0, + "waiting": 0.0, + } @gen_cluster(client=True) From 157d25b61f8cccfe50b634443cd1f609b4b6a705 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 30 Aug 2022 12:50:10 +0200 Subject: [PATCH 13/14] Extract fetch_metrics --- .../scheduler/tests/test_scheduler_http.py | 41 ++++------------- .../scheduler/tests/test_semaphore_http.py | 30 +++---------- .../http/worker/tests/test_worker_http.py | 44 ++++--------------- distributed/utils_test.py | 16 +++++++ 4 files changed, 38 insertions(+), 93 deletions(-) diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index 8afb53f171a..1b36807f133 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -17,7 +17,7 @@ from distributed import Lock from distributed.utils import is_valid_xml -from distributed.utils_test import gen_cluster, inc, lock_inc, slowinc +from distributed.utils_test import fetch_metrics, gen_cluster, inc, lock_inc, slowinc DEFAULT_ROUTES = dask.config.get("distributed.scheduler.http.routes") @@ -89,25 +89,8 @@ async def test_prefix(c, s, a, b): @gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_prometheus(c, s, a, b): pytest.importorskip("prometheus_client") - from prometheus_client.parser import text_string_to_metric_families - http_client = AsyncHTTPClient() - - async def fetch_metrics(): - port = s.http_server.port - response = await http_client.fetch(f"http://localhost:{port}/metrics") - assert response.code == 200 - assert response.headers["Content-Type"] == "text/plain; version=0.0.4" - - txt = response.body.decode("utf8") - families = { - family.name: family - for family in text_string_to_metric_families(txt) - if family.name.startswith("dask_scheduler_") - } - return families - - active_metrics = await fetch_metrics() + active_metrics = await fetch_metrics(s.http_server.port, "dask_scheduler_") expected_metrics = { "dask_scheduler_clients", @@ -123,23 +106,15 @@ async def fetch_metrics(): # request data twice since there once was a case where metrics got registered multiple times resulting in # prometheus_client errors - await fetch_metrics() + await fetch_metrics(s.http_server.port, "dask_scheduler_") @gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_prometheus_collect_task_states(c, s, a, b): pytest.importorskip("prometheus_client") - from prometheus_client.parser import text_string_to_metric_families - - http_client = AsyncHTTPClient() - async def fetch_metrics(): - port = s.http_server.port - response = await http_client.fetch(f"http://localhost:{port}/metrics") - txt = response.body.decode("utf8") - families = { - family.name: family for family in text_string_to_metric_families(txt) - } + async def fetch_state_metrics(): + families = await fetch_metrics(s.http_server.port, prefix="dask_scheduler_") active_metrics = { sample.labels["state"]: sample.value @@ -156,7 +131,7 @@ async def fetch_metrics(): # Ensure that we get full zero metrics for all states even though the # scheduler did nothing, yet assert not s.tasks - active_metrics, forgotten_tasks = await fetch_metrics() + active_metrics, forgotten_tasks = await fetch_state_metrics() assert active_metrics.keys() == expected assert sum(active_metrics.values()) == 0.0 assert sum(forgotten_tasks) == 0.0 @@ -166,7 +141,7 @@ async def fetch_metrics(): while not any(future.key in w.state.tasks for w in [a, b]): await asyncio.sleep(0.001) - active_metrics, forgotten_tasks = await fetch_metrics() + active_metrics, forgotten_tasks = await fetch_state_metrics() assert active_metrics.keys() == expected assert sum(active_metrics.values()) == 1.0 assert sum(forgotten_tasks) == 0.0 @@ -179,7 +154,7 @@ async def fetch_metrics(): while any(future.key in w.state.tasks for w in [a, b]): await asyncio.sleep(0.001) - active_metrics, forgotten_tasks = await fetch_metrics() + active_metrics, forgotten_tasks = await fetch_state_metrics() assert active_metrics.keys() == expected assert sum(active_metrics.values()) == 0.0 assert sum(forgotten_tasks) == 0.0 diff --git a/distributed/http/scheduler/tests/test_semaphore_http.py b/distributed/http/scheduler/tests/test_semaphore_http.py index d7fffcb257a..cae0395487e 100644 --- a/distributed/http/scheduler/tests/test_semaphore_http.py +++ b/distributed/http/scheduler/tests/test_semaphore_http.py @@ -1,34 +1,16 @@ from __future__ import annotations import pytest -from tornado.httpclient import AsyncHTTPClient from distributed import Semaphore -from distributed.utils_test import gen_cluster +from distributed.utils_test import fetch_metrics, gen_cluster @gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_prometheus(c, s, a, b): pytest.importorskip("prometheus_client") - from prometheus_client.parser import text_string_to_metric_families - http_client = AsyncHTTPClient() - - async def fetch_metrics(): - port = s.http_server.port - response = await http_client.fetch(f"http://localhost:{port}/metrics") - assert response.code == 200 - assert response.headers["Content-Type"] == "text/plain; version=0.0.4" - - txt = response.body.decode("utf8") - families = { - family.name: family - for family in text_string_to_metric_families(txt) - if family.name.startswith("dask_semaphore_") - } - return families - - active_metrics = await fetch_metrics() + active_metrics = await fetch_metrics(s.http_server.port, "dask_semaphore_") expected_metrics = { "dask_semaphore_max_leases", @@ -45,7 +27,7 @@ async def fetch_metrics(): sem = await Semaphore(name="test", max_leases=2) - active_metrics = await fetch_metrics() + active_metrics = await fetch_metrics(s.http_server.port, "dask_semaphore_") assert active_metrics.keys() == expected_metrics # Assert values are set upon intialization for name, v in active_metrics.items(): @@ -59,7 +41,7 @@ async def fetch_metrics(): assert sample.value == 0 assert await sem.acquire() - active_metrics = await fetch_metrics() + active_metrics = await fetch_metrics(s.http_server.port, "dask_semaphore_") assert active_metrics["dask_semaphore_max_leases"].samples[0].value == 2 assert active_metrics["dask_semaphore_active_leases"].samples[0].value == 1 assert ( @@ -71,7 +53,7 @@ async def fetch_metrics(): assert active_metrics["dask_semaphore_pending_leases"].samples[0].value == 0 assert await sem.release() is True - active_metrics = await fetch_metrics() + active_metrics = await fetch_metrics(s.http_server.port, "dask_semaphore_") assert active_metrics["dask_semaphore_max_leases"].samples[0].value == 2 assert active_metrics["dask_semaphore_active_leases"].samples[0].value == 0 assert ( @@ -83,7 +65,7 @@ async def fetch_metrics(): assert active_metrics["dask_semaphore_pending_leases"].samples[0].value == 0 await sem.close() - active_metrics = await fetch_metrics() + active_metrics = await fetch_metrics(s.http_server.port, "dask_semaphore_") assert active_metrics.keys() == expected_metrics for v in active_metrics.values(): assert v.samples == [] diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index 64bdfac8e16..02d3d4dba59 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -7,30 +7,14 @@ from tornado.httpclient import AsyncHTTPClient from distributed import Event -from distributed.utils_test import gen_cluster +from distributed.utils_test import fetch_metrics, gen_cluster @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) async def test_prometheus(c, s, a): pytest.importorskip("prometheus_client") - from prometheus_client.parser import text_string_to_metric_families - http_client = AsyncHTTPClient() - - async def fetch_metrics(): - port = a.http_server.port - response = await http_client.fetch(f"http://localhost:{port}/metrics") - assert response.code == 200 - assert response.headers["Content-Type"] == "text/plain; version=0.0.4" - txt = response.body.decode("utf8") - families = { - family.name: family - for family in text_string_to_metric_families(txt) - if family.name.startswith("dask_worker_") - } - return families - - active_metrics = await fetch_metrics() + active_metrics = await fetch_metrics(a.http_server.port, prefix="dask_worker_") expected_metrics = { "dask_worker_tasks", @@ -56,27 +40,15 @@ async def fetch_metrics(): # request data twice since there once was a case where metrics got registered # multiple times resulting in prometheus_client errors - await fetch_metrics() + await fetch_metrics(a.http_server.port, prefix="dask_worker_") @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) async def test_prometheus_collect_task_states(c, s, a): pytest.importorskip("prometheus_client") - from prometheus_client.parser import text_string_to_metric_families - http_client = AsyncHTTPClient() - - async def fetch_metrics(): - port = a.http_server.port - response = await http_client.fetch(f"http://localhost:{port}/metrics") - assert response.code == 200 - assert response.headers["Content-Type"] == "text/plain; version=0.0.4" - txt = response.body.decode("utf8") - families = { - family.name: family - for family in text_string_to_metric_families(txt) - if family.name.startswith("dask_worker_") - } + async def fetch_state_metrics(): + families = await fetch_metrics(a.http_server.port, prefix="dask_worker_") active_metrics = { sample.labels["state"]: sample.value for sample in families["dask_worker_tasks"].samples @@ -85,7 +57,7 @@ async def fetch_metrics(): expected_metrics = {"stored", "executing", "ready", "waiting"} assert not a.state.tasks - active_metrics = await fetch_metrics() + active_metrics = await fetch_state_metrics() assert active_metrics == { "stored": 0.0, "executing": 0.0, @@ -100,7 +72,7 @@ async def fetch_metrics(): while not a.state.executing: await asyncio.sleep(0.001) - active_metrics = await fetch_metrics() + active_metrics = await fetch_state_metrics() assert active_metrics == { "stored": 0.0, "executing": 1.0, @@ -116,7 +88,7 @@ async def fetch_metrics(): while future.key in a.state.tasks: await asyncio.sleep(0.001) - active_metrics = await fetch_metrics() + active_metrics = await fetch_state_metrics() assert active_metrics == { "stored": 0.0, "executing": 0.0, diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 450290a0b81..da02ee9f9d2 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -33,6 +33,7 @@ import pytest import yaml from tlz import assoc, memoize, merge +from tornado.httpclient import AsyncHTTPClient from tornado.ioloop import IOLoop import dask @@ -2440,3 +2441,18 @@ def _bind_port(port): raise TimeoutError(f"Default ports didn't open up in time for {name_of_test}") yield + + +async def fetch_metrics(port: int, prefix: str | None = None) -> dict[str, Any]: + from prometheus_client.parser import text_string_to_metric_families + + http_client = AsyncHTTPClient() + response = await http_client.fetch(f"http://localhost:{port}/metrics") + assert response.code == 200 + txt = response.body.decode("utf8") + families = { + family.name: family + for family in text_string_to_metric_families(txt) + if prefix is None or family.name.startswith(prefix) + } + return families From 51f9781a8f513ec5b83b44ee6488823d24b3aa26 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 30 Aug 2022 12:56:02 +0200 Subject: [PATCH 14/14] Fix lint --- distributed/http/worker/prometheus/core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index bcb788a2780..8e40957bf83 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +from typing import ClassVar from distributed.http.prometheus import PrometheusCollector from distributed.http.utils import RequestHandler