diff --git a/distributed/client.py b/distributed/client.py index ef14a39b0cb..b74a8526e0a 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -52,7 +52,7 @@ from tornado import gen from tornado.ioloop import PeriodicCallback -from . import preloading +from . import cluster_dump, preloading from . import versions as version_module # type: ignore from .batched import BatchedSend from .cfexecutor import ClientExecutor @@ -3826,108 +3826,73 @@ def scheduler_info(self, **kwargs): self.sync(self._update_scheduler_info) return self._scheduler_identity - async def _dump_cluster_state( - self, - filename: str, - exclude: Collection[str], - format: Literal["msgpack", "yaml"], - ) -> None: - - scheduler_info = self.scheduler.dump_state(exclude=exclude) - - workers_info = self.scheduler.broadcast( - msg={"op": "dump_state", "exclude": exclude}, - on_error="return_pickle", - ) - versions_info = self._get_versions() - scheduler_info, workers_info, versions_info = await asyncio.gather( - scheduler_info, workers_info, versions_info - ) - # Unpickle RPC errors and convert them to string - workers_info = { - k: repr(loads(v)) if isinstance(v, bytes) else v - for k, v in workers_info.items() - } - - state = { - "scheduler": scheduler_info, - "workers": workers_info, - "versions": versions_info, - } - - def tuple_to_list(node): - if isinstance(node, (list, tuple)): - return [tuple_to_list(el) for el in node] - elif isinstance(node, dict): - return {k: tuple_to_list(v) for k, v in node.items()} - else: - return node - - # lists are converted to tuples by the RPC - state = tuple_to_list(state) - - filename = str(filename) - if format == "msgpack": - import gzip - - import msgpack - - suffix = ".msgpack.gz" - if not filename.endswith(suffix): - filename += suffix - - with gzip.open(filename, "wb") as fdg: - msgpack.pack(state, fdg) - elif format == "yaml": - import yaml - - suffix = ".yaml" - if not filename.endswith(suffix): - filename += suffix - - with open(filename, "w") as fd: - yaml.dump(state, fd) - else: - raise ValueError( - f"Unsupported format {format}. Possible values are `msgpack` or `yaml`" - ) - def dump_cluster_state( self, filename: str = "dask-cluster-dump", + write_from_scheduler: bool | None = None, exclude: Collection[str] = ("run_spec",), format: Literal["msgpack", "yaml"] = "msgpack", + **storage_options, ): - """Extract a dump of the entire cluster state and persist to disk. + """Extract a dump of the entire cluster state and persist to disk or a URL. This is intended for debugging purposes only. - Warning: Memory usage on client side can be large. + Warning: Memory usage on the scheduler (and client, if writing the dump locally) + can be large. On a large or long-running cluster, this can take several minutes. + The scheduler may be unresponsive while the dump is processed. Results will be stored in a dict:: { - "scheduler_info": {...}, - "worker_info": { - worker_addr: {...}, # worker attributes + "scheduler": {...}, # scheduler state + "workers": { + worker_addr: {...}, # worker state ... } + "versions": { + "scheduler": {...}, + "workers": { + worker_addr: {...}, + ... + } + } } Parameters ---------- filename: - The output filename. The appropriate file suffix (`.msgpack.gz` or - `.yaml`) will be appended automatically. + The path or URL to write to. The appropriate file suffix (``.msgpack.gz`` or + ``.yaml``) will be appended automatically. + + Must be a path supported by :func:`fsspec.open` (like ``s3://my-bucket/cluster-dump``, + or ``cluster-dumps/dump``). See ``write_from_scheduler`` to control whether + the dump is written directly to ``filename`` from the scheduler, or sent + back to the client over the network, then written locally. + write_from_scheduler: + If None (default), infer based on whether ``filename`` looks like a URL + or a local path: True if the filename contains ``://`` (like + ``s3://my-bucket/cluster-dump``), False otherwise (like ``local_dir/cluster-dump``). + + If True, write cluster state directly to ``filename`` from the scheduler. + If ``filename`` is a local path, the dump will be written to that + path on the *scheduler's* filesystem, so be careful if the scheduler is running + on ephemeral hardware. Useful when the scheduler is attached to a network + filesystem or persistent disk, or for writing to buckets. + + If False, transfer cluster state from the scheduler back to the client + over the network, then write it to ``filename``. This is much less + efficient for large dumps, but useful when the scheduler doesn't have + access to any persistent storage. exclude: A collection of attribute names which are supposed to be excluded from the dump, e.g. to exclude code, tracebacks, logs, etc. - Defaults to exclude `run_spec` which is the serialized user code. This - is typically not required for debugging. To allow serialization of - this, pass an empty tuple. + Defaults to exclude ``run_spec``, which is the serialized user code. + This is typically not required for debugging. To allow serialization + of this, pass an empty tuple. format: - Either msgpack or yaml. If msgpack is used (default), the output - will be stored in a gzipped file as msgpack. + Either ``"msgpack"`` or ``"yaml"``. If msgpack is used (default), + the output will be stored in a gzipped file as msgpack. To read:: @@ -3944,15 +3909,45 @@ def dump_cluster_state( from yaml import Loader with open("filename") as fd: state = yaml.load(fd, Loader=Loader) - + **storage_options: + Any additional arguments to :func:`fsspec.open` when writing to a URL. """ return self.sync( self._dump_cluster_state, filename=filename, - format=format, + write_from_scheduler=write_from_scheduler, exclude=exclude, + format=format, + **storage_options, ) + async def _dump_cluster_state( + self, + filename: str = "dask-cluster-dump", + write_from_scheduler: bool | None = None, + exclude: Collection[str] = ("run_spec",), + format: Literal["msgpack", "yaml"] = "msgpack", + **storage_options, + ): + filename = str(filename) + if write_from_scheduler is None: + write_from_scheduler = "://" in filename + + if write_from_scheduler: + await self.scheduler.dump_cluster_state_to_url( + url=filename, + exclude=exclude, + format=format, + **storage_options, + ) + else: + await cluster_dump.write_state( + partial(self.scheduler.get_cluster_state, exclude=exclude), + filename, + format, + **storage_options, + ) + def write_scheduler_file(self, scheduler_file): """Write the scheduler information to a json file. diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py new file mode 100644 index 00000000000..bf00708f3b8 --- /dev/null +++ b/distributed/cluster_dump.py @@ -0,0 +1,59 @@ +"Utilities for generating and analyzing cluster dumps" + +from __future__ import annotations + +from typing import IO, Any, Awaitable, Callable, Literal + +import fsspec +import msgpack + +from distributed.compatibility import to_thread + + +def _tuple_to_list(node): + if isinstance(node, (list, tuple)): + return [_tuple_to_list(el) for el in node] + elif isinstance(node, dict): + return {k: _tuple_to_list(v) for k, v in node.items()} + else: + return node + + +async def write_state( + get_state: Callable[[], Awaitable[Any]], + url: str, + format: Literal["msgpack", "yaml"], + **storage_options: dict[str, Any], +) -> None: + "Await a cluster dump, then serialize and write it to a path" + if format == "msgpack": + mode = "wb" + suffix = ".msgpack.gz" + if not url.endswith(suffix): + url += suffix + writer = msgpack.pack + elif format == "yaml": + import yaml + + mode = "w" + suffix = ".yaml" + if not url.endswith(suffix): + url += suffix + + def writer(state: dict, f: IO): + # YAML adds unnecessary `!!python/tuple` tags; convert tuples to lists to avoid them. + # Unnecessary for msgpack, since tuples and lists are encoded the same. + yaml.dump(_tuple_to_list(state), f) + + else: + raise ValueError( + f"Unsupported format {format!r}. Possible values are 'msgpack' or 'yaml'." + ) + + # Eagerly open the file to catch any errors before doing the full dump + # NOTE: `compression="infer"` will automatically use gzip via the `.gz` suffix + with fsspec.open(url, mode, compression="infer", **storage_options) as f: + state = await get_state() + # Write from a thread so we don't block the event loop quite as badly + # (the writer will still hold the GIL a lot though). + await to_thread(writer, state, f) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 59cd77419b7..ce0622c0c26 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -28,7 +28,7 @@ from datetime import timedelta from functools import partial from numbers import Number -from typing import ClassVar, Literal +from typing import Any, ClassVar, Dict, Literal from typing import cast as pep484_cast import psutil @@ -51,6 +51,7 @@ from dask.utils import format_bytes, format_time, parse_bytes, parse_timedelta, tmpfile from dask.widgets import get_template +from distributed import cluster_dump from distributed.utils import recursive_to_dict from . import preloading, profile @@ -3970,6 +3971,8 @@ def __init__( "subscribe_worker_status": self.subscribe_worker_status, "start_task_metadata": self.start_task_metadata, "stop_task_metadata": self.stop_task_metadata, + "get_cluster_state": self.get_cluster_state, + "dump_cluster_state_to_url": self.dump_cluster_state_to_url, } connection_limit = get_fileno_limit() / 2 @@ -4082,6 +4085,55 @@ def _to_dict( info.update(recursive_to_dict(extra, exclude=exclude)) return info + async def get_cluster_state( + self, + exclude: "Collection[str]", + ) -> dict: + "Produce the state dict used in a cluster state dump" + # Kick off state-dumping on workers before we block the event loop in `self._to_dict`. + workers_future = asyncio.gather( + self.broadcast( + msg={"op": "dump_state", "exclude": exclude}, + on_error="return", + ), + self.broadcast( + msg={"op": "versions"}, + on_error="ignore", + ), + ) + try: + scheduler_state = self._to_dict(exclude=exclude) + + worker_states, worker_versions = await workers_future + finally: + # Ensure the tasks aren't left running if anything fails. + # Someday (py3.11), use a trio-style TaskGroup for this. + workers_future.cancel() + + # Convert any RPC errors to strings + worker_states = { + k: repr(v) if isinstance(v, Exception) else v + for k, v in worker_states.items() + } + + return { + "scheduler": scheduler_state, + "workers": worker_states, + "versions": {"scheduler": self.versions(), "workers": worker_versions}, + } + + async def dump_cluster_state_to_url( + self, + url: str, + exclude: "Collection[str]", + format: Literal["msgpack", "yaml"], + **storage_options: Dict[str, Any], + ) -> None: + "Write a cluster state dump to an fsspec-compatible URL." + await cluster_dump.write_state( + partial(self.get_cluster_state, exclude), url, format, **storage_options + ) + def get_worker_service_addr(self, worker, service_name, protocol=False): """ Get the (host, port) address of the named service on the *worker*. diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index b2f32d48ced..abd496e3eb1 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -7260,23 +7260,23 @@ def test_print_simple(capsys): assert "Hello!:123" in out -def _verify_cluster_dump(path, _format: str, addresses: set[str]) -> dict: - path = str(path) - if _format == "msgpack": - import gzip +def _verify_cluster_dump(url, format: str, addresses: set[str]) -> dict: + fsspec = pytest.importorskip("fsspec") + url = str(url) + if format == "msgpack": import msgpack - path += ".msgpack.gz" - - with gzip.open(path) as fd_zip: - state = msgpack.unpack(fd_zip) + url += ".msgpack.gz" + loader = msgpack.unpack else: import yaml - path += ".yaml" - with open(path) as fd_plain: - state = yaml.safe_load(fd_plain) + url += ".yaml" + loader = yaml.safe_load + + with fsspec.open(url, mode="rb", compression="infer") as f: + state = loader(f) assert isinstance(state, dict) assert "scheduler" in state @@ -7286,25 +7286,63 @@ def _verify_cluster_dump(path, _format: str, addresses: set[str]) -> dict: return state +def test_dump_cluster_state_write_from_scheduler( + c, s, a, b, tmp_path, monkeypatch: pytest.MonkeyPatch +): + monkeypatch.chdir(tmp_path) + + scheduler_dir = tmp_path / "scheduler" + scheduler_dir.mkdir() + c.run_on_scheduler(os.chdir, str(scheduler_dir)) + + c.dump_cluster_state("not-url") + assert (tmp_path / "not-url.msgpack.gz").is_file() + + c.dump_cluster_state("file://is-url") + assert (scheduler_dir / "is-url.msgpack.gz").is_file() + + c.dump_cluster_state("file://local-explicit", write_from_scheduler=False) + assert (tmp_path / "local-explicit.msgpack.gz").is_file() + + c.dump_cluster_state("scheduler-explicit", write_from_scheduler=True) + assert (scheduler_dir / "scheduler-explicit.msgpack.gz").is_file() + + +@pytest.mark.parametrize("local", [True, False]) @pytest.mark.parametrize("_format", ["msgpack", "yaml"]) -def test_dump_cluster_state_sync(c, s, a, b, tmp_path, _format): +def test_dump_cluster_state_sync(c, s, a, b, tmp_path, _format, local): filename = tmp_path / "foo" + if not local: + pytest.importorskip("fsspec") + # Make it look like an fsspec path + filename = f"file://{filename}" c.dump_cluster_state(filename, format=_format) _verify_cluster_dump(filename, _format, {a["address"], b["address"]}) +@pytest.mark.parametrize("local", [True, False]) @pytest.mark.parametrize("_format", ["msgpack", "yaml"]) @gen_cluster(client=True) -async def test_dump_cluster_state_async(c, s, a, b, tmp_path, _format): +async def test_dump_cluster_state_async(c, s, a, b, tmp_path, _format, local): filename = tmp_path / "foo" + if not local: + pytest.importorskip("fsspec") + # Make it look like an fsspec path + filename = f"file://{filename}" await c.dump_cluster_state(filename, format=_format) _verify_cluster_dump(filename, _format, {a.address, b.address}) +@pytest.mark.parametrize("local", [True, False]) @gen_cluster(client=True) -async def test_dump_cluster_state_json(c, s, a, b, tmp_path): +async def test_dump_cluster_state_json(c, s, a, b, tmp_path, local): + filename = tmp_path / "foo" + if not local: + pytest.importorskip("fsspec") + # Make it look like an fsspec path + filename = f"file://{filename}" with pytest.raises(ValueError, match="Unsupported format"): - await c.dump_cluster_state(tmp_path / "foo", format="json") + await c.dump_cluster_state(filename, format="json") @gen_cluster(client=True) @@ -7363,19 +7401,6 @@ async def test_dump_cluster_state_exclude_default(c, s, a, b, tmp_path): assert k in s.tasks -@gen_cluster(client=True, config={"distributed.comm.timeouts.connect": "200ms"}) -async def test_dump_cluster_state_error(c, s, a, b, tmp_path): - a.stop() - filename = tmp_path / "foo" - await c.dump_cluster_state(filename, format="yaml") - state = _verify_cluster_dump(filename, "yaml", {a.address, b.address}) - assert state["workers"][a.address] == ( - f"OSError('Timed out trying to connect to {a.address} after 0.2 s')" - ) - assert isinstance(state["workers"][b.address], dict) - assert state["versions"]["workers"].keys() == {b.address} - - class TestClientSecurityLoader: @contextmanager def config_loader(self, monkeypatch, loader): diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py new file mode 100644 index 00000000000..5653b06732e --- /dev/null +++ b/distributed/tests/test_cluster_dump.py @@ -0,0 +1,46 @@ +import fsspec +import msgpack +import pytest +import yaml + +from distributed.cluster_dump import _tuple_to_list, write_state +from distributed.utils_test import gen_test + + +@pytest.mark.parametrize( + "input, expected", + [ + ([1, 2, 3], [1, 2, 3]), + ((1, 2, 3), [1, 2, 3]), + ({"x": (1, (2,))}, {"x": [1, [2]]}), + ("foo", "foo"), + ], +) +def test_tuple_to_list(input, expected): + assert _tuple_to_list(input) == expected + + +async def get_state(): + return {"foo": "bar", "list": ["a"], "tuple": (1, "two", 3)} + + +@gen_test() +async def test_write_state_msgpack(tmp_path): + path = str(tmp_path / "bar") + await write_state(get_state, path, "msgpack") + + with fsspec.open(f"{path}.msgpack.gz", "rb", compression="gzip") as f: + readback = msgpack.load(f) + assert readback == _tuple_to_list(await get_state()) + + +@gen_test() +async def test_write_state_yaml(tmp_path): + path = str(tmp_path / "bar") + await write_state(get_state, path, "yaml") + + with open(f"{path}.yaml") as f: + readback = yaml.safe_load(f) + assert readback == _tuple_to_list(await get_state()) + f.seek(0) + assert "!!python/tuple" not in f.read() diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 6d5081ef0bd..2f8f8cb3a92 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -8,6 +8,7 @@ from itertools import product from textwrap import dedent from time import sleep +from typing import Collection from unittest import mock import cloudpickle @@ -3401,6 +3402,91 @@ async def test_TaskState__to_dict(c, s): assert tasks["y"]["dependencies"] == [""] +def _verify_cluster_state( + state: dict, workers: Collection[Worker], allow_missing: bool = False +) -> None: + addrs = {w.address for w in workers} + assert state.keys() == {"scheduler", "workers", "versions"} + assert state["workers"].keys() == addrs + if allow_missing: + assert state["versions"]["workers"].keys() <= addrs + else: + assert state["versions"]["workers"].keys() == addrs + + +@gen_cluster(nthreads=[("", 1)] * 2) +async def test_get_cluster_state(s: Scheduler, *workers: Worker): + state = await s.get_cluster_state([]) + _verify_cluster_state(state, workers) + + await asyncio.gather(*(w.close() for w in workers)) + + while s.workers: + await asyncio.sleep(0.01) + + state_no_workers = await s.get_cluster_state([]) + _verify_cluster_state(state_no_workers, []) + + +@gen_cluster( + nthreads=[("", 1)] * 2, + config={"distributed.comm.timeouts.connect": "200ms"}, +) +async def test_get_cluster_state_worker_error(s: Scheduler, a: Worker, b: Worker): + a.stop() + state = await s.get_cluster_state([]) + _verify_cluster_state(state, [a, b], allow_missing=True) + assert state["workers"][a.address] == ( + f"OSError('Timed out trying to connect to {a.address} after 0.2 s')" + ) + assert isinstance(state["workers"][b.address], dict) + assert state["versions"]["workers"].keys() == {b.address} + + +def _verify_cluster_dump(url, format: str, workers: Collection[Worker]) -> dict: + import fsspec + + if format == "msgpack": + import msgpack + + url += ".msgpack.gz" + loader = msgpack.unpack + + else: + import yaml + + url += ".yaml" + loader = yaml.safe_load + + with fsspec.open(url, mode="rb", compression="infer") as f: + state = loader(f) + + _verify_cluster_state(state, workers) + return state + + +@pytest.mark.parametrize("format", ["msgpack", "yaml"]) +@gen_cluster(nthreads=[("", 1)] * 2) +async def test_dump_cluster_state(s: Scheduler, *workers: Worker, format): + fsspec = pytest.importorskip("fsspec") + try: + await s.dump_cluster_state_to_url( + "memory://state-dumps/two-workers", [], format + ) + _verify_cluster_dump("memory://state-dumps/two-workers", format, workers) + + await asyncio.gather(*(w.close() for w in workers)) + + while s.workers: + await asyncio.sleep(0.01) + + await s.dump_cluster_state_to_url("memory://state-dumps/no-workers", [], format) + _verify_cluster_dump("memory://state-dumps/no-workers", format, []) + finally: + fs = fsspec.filesystem("memory") + fs.rm("state-dumps", recursive=True) + + @gen_cluster(nthreads=[]) async def test_idempotent_plugins(s): diff --git a/docs/source/conf.py b/docs/source/conf.py index 1c49d63275e..6e9c0165d4c 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -385,6 +385,7 @@ "pandas": ("https://pandas.pydata.org/docs", None), "dask": ("https://docs.dask.org/en/latest", None), "bokeh": ("https://docs.bokeh.org/en/latest", None), + "fsspec": ("https://filesystem-spec.readthedocs.io/en/latest", None), } # Redirects