From 63622b38d1fbd9f0d8cee92c6c99c4ffaf47a2e1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 21 Apr 2022 14:47:02 -0700 Subject: [PATCH] Hold event loop while evicting --- distributed/worker_memory.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/distributed/worker_memory.py b/distributed/worker_memory.py index f08dacdb2c2..4d2c5fc05ac 100644 --- a/distributed/worker_memory.py +++ b/distributed/worker_memory.py @@ -28,6 +28,7 @@ from collections.abc import Callable, MutableMapping from contextlib import suppress from functools import partial +from time import monotonic from typing import TYPE_CHECKING, Any, Container, Literal, cast import psutil @@ -225,6 +226,7 @@ async def _maybe_spill(self, worker: Worker, memory: int) -> None: "Worker is at %.0f%% memory usage. Start spilling data to disk.", frac * 100, ) + start = monotonic() # Implement hysteresis cycle where spilling starts at the spill threshold and # stops at the target threshold. Normally that here the target threshold defines # process memory, whereas normally it defines reported managed memory (e.g. @@ -255,7 +257,13 @@ async def _maybe_spill(self, worker: Worker, memory: int) -> None: total_spilled += weight count += 1 - await asyncio.sleep(0) + # If the current buffer is filled with a lot of small values, + # evicting one at a time is very slow and the worker might + # generate new data faster than it is able to evict. Therefore, + # only pass on control if we spent at least 0.5s evicting + if monotonic() - start > 0.5: + await asyncio.sleep(0) + start = monotonic() memory = worker.monitor.get_process_memory() if total_spilled > need and memory > target: