Wait for n workers before continuing#2688
Conversation
guillaumeeb
left a comment
There was a problem hiding this comment.
This may very well be sufficient, and eventually superseeds dask/dask-jobqueue#223.
Maybe you will need to add some conditionals like in
distributed/distributed/client.py
Line 780 in 5938763
I've one slight concern about the future, I'm not sure Cluster objects should have a Scheduler attribute if they are clearly separated as proposed in #2235.
There was a problem hiding this comment.
One comment below that goes with @guillaumeeb 's comment about not assuming the presence of the scheduler.
Also, this could use a test. Maybe something like the following:
@gen_cluster(client=True)
def test_wait_for_workers(c, s, a, b):
future = client.wait_for_workers(n=3)
yield gen.sleep(0.1)
assert not future.done()
w = yield Worker(s.address)
start = time()
yield future
assert time() < start + 1
yield w._close()Also, any thoughts on how to future-proof the method name? I can imagine people wanting other methods like this in the future, and I'd prefer not to have many of them, but to simply add more keyword arguments to this one. Anything we can do to make this a place for such functionality in the future would be welcome.
| @gen.coroutine | ||
| def _wait_until_n_workers(self, n): | ||
| while n and len(self.cluster.scheduler.workers) < n: | ||
| yield gen.sleep(0.01) |
There was a problem hiding this comment.
I agree with @guillaumeeb 's point that we shouldn't assume the existence of either the scheduler or even the cluster attribute. The generic way to do this is to poll the scheduler's identity function.
while True:
info = yield self.scheduler.identity()
if len(info['workers']) >= n:
break
else:
yield gen.sleep(0.200)The self.scheduler attribute used above is a connection to the scheduler that will always be there, rather than an explicit reference to the scheduler itself.
|
Maybe we could do something like |
|
yes, something like that might work. We could probably use standard keyword arguments for this as well def wait_for_workers(n_workers=None):
...Then as we add more things we can add more keywords def wait_for_workers(n_workers=None, memory=None, cores=None):
... |
|
@guillaumeeb what should we do when the scheduler isn't there? currently I have it sleep until it appears, but maybe we should raise error.... or raise error after a counter triggers. |
| if workers: | ||
| @gen.coroutine | ||
| def f(): | ||
| info, _ = self._get_current_info_and_scheduler() |
There was a problem hiding this comment.
Thoughts on using yield self.scheduler.identity() here instead ?
There was a problem hiding this comment.
_get_current_info_and_scheduler helps make sure that we have a scheduler and a cluster, and that they are valid. it's a wrapper around the first part of _repr_html_.
your suggested way seems fine to me, and was the first way I did it, but I was just doing what @guillaumeeb suggested. I definitely am less aware of all of the possible scenarios than you guys.
Maybe you will need to add some conditionals like in
is there no need to worry about that?
There was a problem hiding this comment.
Please just use yield self.scheduler.identity(). It's simpler and cleaner. If you run into an issue with this then bring it up and we'll handle it. I think that you'll be fine though.
helps make sure that we have a scheduler and a cluster, and that they are valid
We may not ever have a cluster object locally. That shouldn't be required.
If you want to wait until things are set up cleanly then you could yield self, but that should already have run, and I don't think should be necessary.
| while n and (yield func()) < n: | ||
| yield gen.sleep(0.2) | ||
|
|
||
| def wait_until_n(self, workers=0): |
There was a problem hiding this comment.
I recommend the name wait_for_workers instead. I'm not sure that a novice user will understand what wait_until_n means as immediately.
| def f(): | ||
| info = yield self.scheduler.identity() | ||
| raise gen.Return(len(info['workers'])) | ||
| return self.sync(self._wait_for_workers, f, n_workers) |
There was a problem hiding this comment.
Rather than have the nested coroutine here I recommend that you unpack the definition of f in _wait_for_workers. I think that this should do it.
def _wait_for_workers(self, n_workers=0):
info = yield self.scheduler.identity()
while len(info['workers']) < n_workers:
yield gen.sleep(0.1)
info = yield self.scheduler.identity()
def wait_for_workers(self, n_workers=0):
return self.sync(self._wait_for_workers, n_workers=n_workers)|
This looks good to me. There are a couple of unrelated intermittent failures. Also it looks like there are some linting issues. I recommend using See also https://travis-ci.org/dask/distributed/jobs/532348315 |
|
This looks good to me. Thanks @danpf . Merging |
This is the easiest and I think most well supported implementation of waiting for n workers.
Initially I thought it was wrong to be in
Client, but considering the implementation ofCluster-- (the bulk of which is inLocalCluster) this is the solution which is most immediately compatible with jobqueue and kubernetes.The alternative would be to move
asynchronousandloopfromLocalClustertoCluster, and then make this a top level member of Cluster. In addition, jobqueue would need to inherit fromClusterinstead ofClusterManager. This is currently tested and working onSLURMCluster, I will think of a way to test it without jobqueue though once we settle on the correct path.I'm okay with both ways, but figured I would start with this since it's so low effort.
@mrocklin @guillaumeeb
fix #2138