The current implementation of the worker ensure_communicating will continue to fetch dependencies for as long as there are dependencies to fetch. This can push an already overloaded worker over the edge and cause it to fail.
|
while self.data_needed and ( |
|
len(self.in_flight_workers) < self.total_out_connections |
|
or self.comm_nbytes < self.comm_threshold_bytes |
|
): |
A paused worker should not be allowed to fetch more data.
There are two possible ways to achieve this
- Add another guard to
ensure_communicating to stop scheduling additional gather_dep coroutines
- Remove all tasks from a paused worker that aren't in memory. This would indirectly empty the
data_needed heap and cause a worker to stabilize. This could be achieved by either aggressively stealing or by implementing a custom scheduler handler.
I think both options have a certain appeal. I'm wondering which one is the best to choose, specifically in context of the latest changes to AMM / retirement / pause.
cc @crusaderky
Note: right now, network traffic is only restricted for egress, i.e. incoming get_data requests from other workers, see
|
if self.status == Status.paused: |
|
max_connections = 1 |
|
throttle_msg = " Throttling outgoing connections because worker is paused." |
|
else: |
|
throttle_msg = "" |
The current implementation of the worker
ensure_communicatingwill continue to fetch dependencies for as long as there are dependencies to fetch. This can push an already overloaded worker over the edge and cause it to fail.distributed/distributed/worker.py
Lines 2684 to 2687 in 8734c9d
A paused worker should not be allowed to fetch more data.
There are two possible ways to achieve this
ensure_communicatingto stop scheduling additionalgather_depcoroutinesdata_neededheap and cause a worker to stabilize. This could be achieved by either aggressively stealing or by implementing a custom scheduler handler.I think both options have a certain appeal. I'm wondering which one is the best to choose, specifically in context of the latest changes to AMM / retirement / pause.
cc @crusaderky
Note: right now, network traffic is only restricted for egress, i.e. incoming
get_datarequests from other workers, seedistributed/distributed/worker.py
Lines 1712 to 1716 in 8734c9d