Skip to content

SGE implementation#27

Merged
jhamman merged 14 commits into
dask:masterfrom
lesteve:sge
Apr 12, 2018
Merged

SGE implementation#27
jhamman merged 14 commits into
dask:masterfrom
lesteve:sge

Conversation

@lesteve

@lesteve lesteve commented Apr 4, 2018

Copy link
Copy Markdown
Member

I revived #6 and added simple tests. I was able to get test_basic to pass but not test_adaptive yet.

I'll wait that #25 is merged before working on this further.

Fix #3. Closes #6.

@jhamman jhamman left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for resurrecting this one. Just a few questions on the tests for now.

Comment thread dask_jobqueue/tests/test_sge.py Outdated


@pytest.mark.skipif('SGE_ACCOUNT' in os.environ, reason='SGE_ACCOUNT defined') # noqa: F811
def test_errors(loop):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test is relevant anymore.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was wondering about that and I felt the same way.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this test came from a cluster where project= was necessary to run, and was often set implicitly with an environment variable (perhaps PBS_ACCOUNT on the cluster on which I was working at the time). If this is not generally true for SGE clusters then I agree that this test should be removed.

Comment thread dask_jobqueue/tests/test_sge.py Outdated
start = time()
while cluster.jobs:
sleep(0.100)
assert time() < start + 10

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this testing exactly? @mrocklin - given the recent development of Adaptive, can you help us flesh out what kind of tests we should be looking at here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've simplified the test for now.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I hope you don't mind my pushing to your fork @lesteve )

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've simplified the test for now.

Thanks!

(I hope you don't mind my pushing to your fork @lesteve )

Not at all, pushing into a PR's fork is a feature I use this feature quite often myself, it is very convenient and saves a lot of back and forth for minor things!

* use submit_command="qsub -terse" to make sure only the job id is returned.
lesteve added 3 commits April 11, 2018 16:43
Remove test_errors. PBS_ACCOUNT does not have an equivalent in SGE.
@lesteve lesteve changed the title [WIP] SGE implementation SGE implementation Apr 11, 2018
@lesteve

lesteve commented Apr 11, 2018

Copy link
Copy Markdown
Member Author

OK I I think this is in a mergeable state, comments more than welcome! It is a first implementation of SGE (probably by no means perfect) and allows to test some of the common code in dask_jobqueue/core.py across schedulers.

A few comments:

  • I could not find an equivalent of PBS_ACCOUNT in SGE so I removed the associated code and test_errors test
  • AFAICT adaptive is currently broken, so I am leaving test_adaptive for later.

More details about the adaptive problems I saw (maybe related to #26 not sure):
According to

def scale_down(self, workers):
if isinstance(workers, dict):
names = {v['name'] for v in workers.values()}
job_ids = {name.split('-')[-2] for name in names}
self.stop_workers(job_ids)
, scale_down only does something if isinstance(workers, dict). The problem is that workers is a list of tcp://... worker addresses. So scale_down does nothing i.e. self.jobs stays the same so that scale_up doesn't see the need to schedule more jobs:
def scale_up(self, n, **kwargs):
return self.start_workers(n - len(self.jobs))

In other words your *Cluster Python object think he still have enough workers but your scheduling system has no jobs in running state that can process the work.

I guess this is all fixable by looking at the doc, the distributed.LocalCluster implementation and possibly dask_drmaa as well. I may not find time to look at this for a few days so if anyone wants to pick this up, please be my guest!

@guillaumeeb guillaumeeb left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor fixes and one question, but this looks really fine to me. Thanks!

Comment thread dask_jobqueue/sge.py Outdated
project=None,
resource_spec=None,
walltime='0:30:00',
interface=None,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe interface=None should not be here. It is a keyword from the parent class, and you don't seem to use it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, done.

Comment thread dask_jobqueue/sge.py Outdated
project : str
Accounting string associated with each worker job. Passed to
`#$ -A` option.
threads_per_worker : int

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all the parameters inherited from JobQueueCluster, you should use the same mechanism as in PBSCluster or SLURMCluster, with the docstrings module: %(JobQueueCluster.parameters)s

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the sge.py file still misses some import and annotation, e.g.

from .core import JobQueueCluster, docstrings

and
@docstrings.with_indent(4)

You also still have the description of interface kw in the docstring!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right I fixed that, I am not so familiar with the docrep magic!

Comment thread dask_jobqueue/core.py
with self.job_file() as fn:
out = self._call([self.submit_command, fn])
job = out.decode().split('.')[0]
out = self._call(shlex.split(self.submit_command) + [fn])

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know shlex, is this mandatory? It serves what purpose?

@lesteve lesteve Apr 12, 2018

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SGE, by default qsub returns quite a verbose output, e.g. something like Your job 56 ("test.sh") has been submitted. In order for just the job id to be returned, you need to use qsub -terse. This is why submit_cmd = 'qsub -terse'.

That means that you need to split submit_cmd. I think shlex.split is the way to do it for sh commands. We could just do submit_cmd.split(' ') but it may break e.g. if one of the arguments is quoted with a space inside.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that's perfect then!

Comment thread dask_jobqueue/sge.py
'resource_spec': resource_spec,}
self.job_header = self._header_template % self.config

logger.debug("Job script: \n %s" % self.job_script())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably debug log only the header here, but this is also true for PBS or SLURM, and it is really a minor detail...

@guillaumeeb

Copy link
Copy Markdown
Member

And about your other remarks:

  • I totally agree to remove the PBS_ACCOUNT part and according test,
  • For adaptive, you're completely right about the scale_down method, don't know though if it also breaks the scale_up. If you started with lets say only 1 worker, the scale_up should probably be called at some point, and not the scale_down, so something should happen. But yeah, we should look elsewhere!

@guillaumeeb guillaumeeb mentioned this pull request Apr 11, 2018
* Remove interface parameters
* Update docstring which was out of date
@lesteve

lesteve commented Apr 12, 2018

Copy link
Copy Markdown
Member Author

@guillaumeeb thanks for the review! I tackled your comments, let me know if you have some further comments!

Comment thread dask_jobqueue/sge.py Outdated
option.
walltime : str
Walltime for each worker job.
interface : str

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still have the interface keyword in this docstring, once remove I think we're done :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrggh good point, I just pushed the fix.

Comment thread dask_jobqueue/core.py
out = self._call([self.submit_command, fn])
job = out.decode().split('.')[0]
out = self._call(shlex.split(self.submit_command) + [fn])
job = out.decode().split('.')[0].strip()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And by the way the .strip() here is necessary because qsub -terse output finishes with a \n.

@jhamman jhamman merged commit ef15363 into dask:master Apr 12, 2018
@jhamman

jhamman commented Apr 12, 2018

Copy link
Copy Markdown
Member

Thanks @lesteve!

@mrocklin

mrocklin commented Apr 12, 2018 via email

Copy link
Copy Markdown
Member

@lesteve lesteve deleted the sge branch April 13, 2018 04:52
@lesteve

lesteve commented Apr 13, 2018

Copy link
Copy Markdown
Member Author

Nice to see this merged! I'll try to find some time to fix the adaptive problem.

@lesteve lesteve mentioned this pull request May 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants