As experienced in dask/dask-jobqueue#112 and a related PR dask/dask-jobqueue#97, Cluster.scale behavior is unstable if called multiple times in a row.
I suspect part of this problem is due to how asynchronism is used here:
If we want scale to run asynchronously, I propose to just add a _scale() method here (a corountine?) to be called in an async manner from scale(). In this scale, we would get the state and perform the modifications at the same time:
def _scale(self, n):
with log_errors():
if n >= len(self.scheduler.workers):
self.scale_up(n)
else:
to_close = self.scheduler.workers_to_close(
n=len(self.scheduler.workers) - n)
logger.debug("Closing workers: %s", to_close)
self.scheduler.retire_workers(workers=to_close)
self.scale_down(to_close)
@jhamman @mrocklin any opinion, advice?
As experienced in dask/dask-jobqueue#112 and a related PR dask/dask-jobqueue#97,
Cluster.scalebehavior is unstable if called multiple times in a row.I suspect part of this problem is due to how asynchronism is used here:
scale_upasynchronously, so something could happen (here: another call toscale) between state retrieval and effective scale_up.If we want
scaleto run asynchronously, I propose to just add a_scale()method here (a corountine?) to be called in an async manner fromscale(). In thisscale, we would get the state and perform the modifications at the same time:@jhamman @mrocklin any opinion, advice?