Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion distributed/worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from collections.abc import Callable, MutableMapping
from contextlib import suppress
from functools import partial
from time import monotonic

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Windows, this has a granularity of 15ms. Could you use #6181 instead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe distributed.metrics.time would suffice?

from typing import TYPE_CHECKING, Any, Container, Literal, cast

import psutil
Expand Down Expand Up @@ -225,6 +226,7 @@ async def _maybe_spill(self, worker: Worker, memory: int) -> None:
"Worker is at %.0f%% memory usage. Start spilling data to disk.",
frac * 100,
)
start = monotonic()
# Implement hysteresis cycle where spilling starts at the spill threshold and
# stops at the target threshold. Normally that here the target threshold defines
# process memory, whereas normally it defines reported managed memory (e.g.
Expand Down Expand Up @@ -255,7 +257,13 @@ async def _maybe_spill(self, worker: Worker, memory: int) -> None:

total_spilled += weight
count += 1
await asyncio.sleep(0)
# If the current buffer is filled with a lot of small values,
# evicting one at a time is very slow and the worker might
# generate new data faster than it is able to evict. Therefore,
# only pass on control if we spent at least 0.5s evicting
if monotonic() - start > 0.5:
await asyncio.sleep(0)
start = monotonic()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by both the old and the new solution here. It seems like we're staying stuck in a while loop burning the CPU at 100%. Is this correct? If so, I'm against this design generally. (although I'm not objecting to this change, since it doesn't necessarily make things worse from my perspective.

Is my understanding correct here that we're blocking the event loop?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was flawed here. @crusaderky and @fjetter set me straight here. I didn't realize that data.evict was blocking.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's blocking the event loop. No, it's not burning CPU at 100%.

I think the idea here is that it's a crude form of asyncio scheduling. You're basically saying "stop everything! there's nothing that could possibly be more important to do (launch tasks, fetch data, send data, talk to scheduler, etc.) than dumping data to disk right now." That's a bit of a dramatic approach, but not too wrong for this situation. Spilling to disk should probably be the highest priority thing we do at this moment. And ideally starting new tasks, fetching new data, or anything else that could produce memory probably should pause until eviction is done.

This only works because we don't have async disk #4424. With async writes, we'd have to figure out how to actually pause other subsystems without blocking the whole event loop in this situation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't disagree with any of that. My previous understanding was flawed. Please ignore my previous comment. I'm good here.

I'm totally fine passing control periodically.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not burning CPU at 100%.

This is hardly surprising - when a write() is stuck waiting on a slow network file system, it shouldn't burn CPU.


memory = worker.monitor.get_process_memory()
if total_spilled > need and memory > target:
Expand Down