From bd0a699f6f84bfe3d9a7cfd4faa0eb6eb073c9a2 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 6 Jan 2023 10:47:30 +0000 Subject: [PATCH] Drop support for zict < 2.1.0 --- distributed/spill.py | 29 +++++-------------------- distributed/tests/test_spill.py | 10 +-------- distributed/tests/test_worker_memory.py | 15 +------------ 3 files changed, 8 insertions(+), 46 deletions(-) diff --git a/distributed/spill.py b/distributed/spill.py index b01c8e2df43..97d41ceb6af 100644 --- a/distributed/spill.py +++ b/distributed/spill.py @@ -16,7 +16,6 @@ from distributed.sizeof import safe_sizeof logger = logging.getLogger(__name__) -has_zict_210 = parse_version(zict.__version__) >= parse_version("2.1.0") has_zict_220 = parse_version(zict.__version__) >= parse_version("2.2.0") has_zict_230 = parse_version(zict.__version__) >= parse_version("2.3.0") @@ -139,10 +138,6 @@ def __init__( max_spill: int | Literal[False] = False, min_log_interval: float = 2, ): - - if max_spill is not False and not has_zict_210: - raise ValueError("zict >= 2.1.0 required to set max-spill") - slow: MutableMapping[str, Any] = Slow(spill_directory, max_spill) if has_zict_220: # If a value is still in use somewhere on the worker since the last time it @@ -195,11 +190,7 @@ def handle_errors(self, key: str | None) -> Iterator[None]: # This happens only when the key is individually larger than target. # The exception will be caught by Worker and logged; the status of # the task will be set to error. - if has_zict_210: - del self[key] - else: - assert key not in self.fast - assert key not in self.slow + del self[key] raise orig_e else: # The key we just inserted is smaller than target, but it caused @@ -236,11 +227,7 @@ def __setitem__(self, key: str, value: Any) -> None: super().__setitem__(key, value) self.logged_pickle_errors.discard(key) except HandledError: - if has_zict_210: - assert key in self.fast - else: - assert key not in self.fast - logger.error("Key %s lost. Please upgrade to zict >= 2.1.0", key) + assert key in self.fast assert key not in self.slow def evict(self) -> int: @@ -413,14 +400,10 @@ def __setitem__(self, key: str, value: Any) -> None: ) t1 = perf_counter() - if has_zict_210: - # Thanks to Buffer.__setitem__, we never update existing - # keys in slow, but always delete them and reinsert them. - assert key not in self.d - assert key not in self.weight_by_key - else: - self.d.pop(key, None) - self.total_weight -= self.weight_by_key.pop(key, SpilledSize(0, 0)) + # Thanks to Buffer.__setitem__, we never update existing + # keys in slow, but always delete them and reinsert them. + assert key not in self.d + assert key not in self.weight_by_key if ( self.max_weight is not False diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index 51788d7cf1f..6acffb45902 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -13,13 +13,9 @@ from distributed import profile from distributed.compatibility import WINDOWS -from distributed.spill import SpillBuffer, has_zict_210, has_zict_220 +from distributed.spill import SpillBuffer, has_zict_220 from distributed.utils_test import captured_logger -requires_zict_210 = pytest.mark.skipif( - not has_zict_210, - reason="requires zict version >= 2.1.0", -) requires_zict_220 = pytest.mark.skipif( not has_zict_220, reason="requires zict version >= 2.2.0", @@ -130,7 +126,6 @@ def test_disk_size_calculation(tmp_path): assert_buf(buf, tmp_path, {}, {"a": a, "b": b}) -@requires_zict_210 def test_spillbuffer_maxlim(tmp_path_factory): buf_dir = tmp_path_factory.mktemp("buf") buf = SpillBuffer(str(buf_dir), target=200, max_spill=600, min_log_interval=0) @@ -213,7 +208,6 @@ def __sizeof__(self): return self.size -@requires_zict_210 def test_spillbuffer_fail_to_serialize(tmp_path): buf = SpillBuffer(str(tmp_path), target=200, max_spill=600, min_log_interval=0) @@ -247,7 +241,6 @@ def test_spillbuffer_fail_to_serialize(tmp_path): assert_buf(buf, tmp_path, {"b": b, "c": c}, {}) -@requires_zict_210 @pytest.mark.skipif(WINDOWS, reason="Needs chmod") def test_spillbuffer_oserror(tmp_path): buf = SpillBuffer(str(tmp_path), target=200, max_spill=800, min_log_interval=0) @@ -287,7 +280,6 @@ def test_spillbuffer_oserror(tmp_path): assert_buf(buf, tmp_path, {"b": b, "d": d}, {"a": a}) -@requires_zict_210 def test_spillbuffer_evict(tmp_path): buf = SpillBuffer(str(tmp_path), target=300, min_log_interval=0) diff --git a/distributed/tests/test_worker_memory.py b/distributed/tests/test_worker_memory.py index 947df51ee32..e7fe5b024aa 100644 --- a/distributed/tests/test_worker_memory.py +++ b/distributed/tests/test_worker_memory.py @@ -21,7 +21,6 @@ from distributed.compatibility import MACOS, WINDOWS from distributed.core import Status from distributed.metrics import monotonic -from distributed.spill import has_zict_210 from distributed.utils_test import ( NO_AMM, captured_logger, @@ -39,11 +38,6 @@ TaskErredMsg, ) -requires_zict_210 = pytest.mark.skipif( - not has_zict_210, - reason="requires zict version >= 2.1.0", -) - def memory_monitor_running(dask_worker: Worker | Nanny) -> bool: return "memory_monitor" in dask_worker.periodic_callbacks @@ -282,17 +276,11 @@ async def test_fail_to_pickle_execute_2(c, s, a): y = c.submit(lambda: "y" * 256, key="y") await wait(y) - if has_zict_210: - assert set(a.data.memory) == {"x", "y"} - else: - assert set(a.data.memory) == {"y"} - + assert set(a.data.memory) == {"x", "y"} assert not a.data.disk - await assert_basic_futures(c) -@requires_zict_210 @gen_cluster( client=True, nthreads=[("", 1)], @@ -373,7 +361,6 @@ async def test_spill_target_threshold(c, s, a): assert set(a.data.disk) == {"y"} -@requires_zict_210 @gen_cluster( client=True, nthreads=[("", 1)],