Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 6 additions & 23 deletions distributed/spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from distributed.sizeof import safe_sizeof

logger = logging.getLogger(__name__)
has_zict_210 = parse_version(zict.__version__) >= parse_version("2.1.0")
has_zict_220 = parse_version(zict.__version__) >= parse_version("2.2.0")
has_zict_230 = parse_version(zict.__version__) >= parse_version("2.3.0")

Expand Down Expand Up @@ -139,10 +138,6 @@ def __init__(
max_spill: int | Literal[False] = False,
min_log_interval: float = 2,
):

if max_spill is not False and not has_zict_210:
raise ValueError("zict >= 2.1.0 required to set max-spill")

slow: MutableMapping[str, Any] = Slow(spill_directory, max_spill)
if has_zict_220:
# If a value is still in use somewhere on the worker since the last time it
Expand Down Expand Up @@ -195,11 +190,7 @@ def handle_errors(self, key: str | None) -> Iterator[None]:
# This happens only when the key is individually larger than target.
# The exception will be caught by Worker and logged; the status of
# the task will be set to error.
if has_zict_210:
del self[key]
else:
assert key not in self.fast
assert key not in self.slow
del self[key]
raise orig_e
else:
# The key we just inserted is smaller than target, but it caused
Expand Down Expand Up @@ -236,11 +227,7 @@ def __setitem__(self, key: str, value: Any) -> None:
super().__setitem__(key, value)
self.logged_pickle_errors.discard(key)
except HandledError:
if has_zict_210:
assert key in self.fast
else:
assert key not in self.fast
logger.error("Key %s lost. Please upgrade to zict >= 2.1.0", key)
assert key in self.fast
assert key not in self.slow

def evict(self) -> int:
Expand Down Expand Up @@ -413,14 +400,10 @@ def __setitem__(self, key: str, value: Any) -> None:
)
t1 = perf_counter()

if has_zict_210:
# Thanks to Buffer.__setitem__, we never update existing
# keys in slow, but always delete them and reinsert them.
assert key not in self.d
assert key not in self.weight_by_key
else:
self.d.pop(key, None)
self.total_weight -= self.weight_by_key.pop(key, SpilledSize(0, 0))
# Thanks to Buffer.__setitem__, we never update existing
# keys in slow, but always delete them and reinsert them.
assert key not in self.d
assert key not in self.weight_by_key

if (
self.max_weight is not False
Expand Down
10 changes: 1 addition & 9 deletions distributed/tests/test_spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,9 @@

from distributed import profile
from distributed.compatibility import WINDOWS
from distributed.spill import SpillBuffer, has_zict_210, has_zict_220
from distributed.spill import SpillBuffer, has_zict_220
from distributed.utils_test import captured_logger

requires_zict_210 = pytest.mark.skipif(
not has_zict_210,
reason="requires zict version >= 2.1.0",
)
requires_zict_220 = pytest.mark.skipif(
not has_zict_220,
reason="requires zict version >= 2.2.0",
Expand Down Expand Up @@ -130,7 +126,6 @@ def test_disk_size_calculation(tmp_path):
assert_buf(buf, tmp_path, {}, {"a": a, "b": b})


@requires_zict_210
def test_spillbuffer_maxlim(tmp_path_factory):
buf_dir = tmp_path_factory.mktemp("buf")
buf = SpillBuffer(str(buf_dir), target=200, max_spill=600, min_log_interval=0)
Expand Down Expand Up @@ -213,7 +208,6 @@ def __sizeof__(self):
return self.size


@requires_zict_210
def test_spillbuffer_fail_to_serialize(tmp_path):
buf = SpillBuffer(str(tmp_path), target=200, max_spill=600, min_log_interval=0)

Expand Down Expand Up @@ -247,7 +241,6 @@ def test_spillbuffer_fail_to_serialize(tmp_path):
assert_buf(buf, tmp_path, {"b": b, "c": c}, {})


@requires_zict_210
@pytest.mark.skipif(WINDOWS, reason="Needs chmod")
def test_spillbuffer_oserror(tmp_path):
buf = SpillBuffer(str(tmp_path), target=200, max_spill=800, min_log_interval=0)
Expand Down Expand Up @@ -287,7 +280,6 @@ def test_spillbuffer_oserror(tmp_path):
assert_buf(buf, tmp_path, {"b": b, "d": d}, {"a": a})


@requires_zict_210
def test_spillbuffer_evict(tmp_path):
buf = SpillBuffer(str(tmp_path), target=300, min_log_interval=0)

Expand Down
15 changes: 1 addition & 14 deletions distributed/tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from distributed.compatibility import MACOS, WINDOWS
from distributed.core import Status
from distributed.metrics import monotonic
from distributed.spill import has_zict_210
from distributed.utils_test import (
NO_AMM,
captured_logger,
Expand All @@ -39,11 +38,6 @@
TaskErredMsg,
)

requires_zict_210 = pytest.mark.skipif(
not has_zict_210,
reason="requires zict version >= 2.1.0",
)


def memory_monitor_running(dask_worker: Worker | Nanny) -> bool:
return "memory_monitor" in dask_worker.periodic_callbacks
Expand Down Expand Up @@ -282,17 +276,11 @@ async def test_fail_to_pickle_execute_2(c, s, a):

y = c.submit(lambda: "y" * 256, key="y")
await wait(y)
if has_zict_210:
assert set(a.data.memory) == {"x", "y"}
else:
assert set(a.data.memory) == {"y"}

assert set(a.data.memory) == {"x", "y"}
assert not a.data.disk

await assert_basic_futures(c)


@requires_zict_210
@gen_cluster(
client=True,
nthreads=[("", 1)],
Expand Down Expand Up @@ -373,7 +361,6 @@ async def test_spill_target_threshold(c, s, a):
assert set(a.data.disk) == {"y"}


@requires_zict_210
@gen_cluster(
client=True,
nthreads=[("", 1)],
Expand Down