Skip to content
1 change: 1 addition & 0 deletions dask_jobqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .core import JobQueueCluster
from .pbs import PBSCluster
from .slurm import SLURMCluster
from .sge import SGECluster
6 changes: 4 additions & 2 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import socket
import os
import sys
import shlex

import docrep

from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes
Expand Down Expand Up @@ -153,8 +155,8 @@ def start_workers(self, n=1):
workers = []
for _ in range(n):
with self.job_file() as fn:
out = self._call([self.submit_command, fn])
job = out.decode().split('.')[0]
out = self._call(shlex.split(self.submit_command) + [fn])

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know shlex, is this mandatory? It serves what purpose?

@lesteve lesteve Apr 12, 2018

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SGE, by default qsub returns quite a verbose output, e.g. something like Your job 56 ("test.sh") has been submitted. In order for just the job id to be returned, you need to use qsub -terse. This is why submit_cmd = 'qsub -terse'.

That means that you need to split submit_cmd. I think shlex.split is the way to do it for sh commands. We could just do submit_cmd.split(' ') but it may break e.g. if one of the arguments is quoted with a space inside.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that's perfect then!

job = out.decode().split('.')[0].strip()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And by the way the .strip() here is necessary because qsub -terse output finishes with a \n.

self.jobs[self.n] = job
workers.append(self.n)
return workers
Expand Down
77 changes: 77 additions & 0 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging

from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)


@docstrings.with_indent(4)
class SGECluster(JobQueueCluster):
""" Launch Dask on a SGE cluster

Parameters
----------
queue : str
Destination queue for each worker job. Passed to `#$ -q` option.
project : str
Accounting string associated with each worker job. Passed to
`#$ -A` option.
resource_spec : str
Request resources and specify job placement. Passed to `#$ -l`
option.
walltime : str
Walltime for each worker job.
%(JobQueueCluster.parameters)s

Examples
--------
>>> from dask_jobqueue import SGECluster
>>> cluster = SGECluster(queue='regular')
>>> cluster.start_workers(10) # this may take a few seconds to launch

>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and
kill workers based on load.

>>> cluster.adapt()
"""

#Override class variables
submit_command = 'qsub -terse'
cancel_command = 'qdel'

def __init__(self,
queue=None,
project=None,
resource_spec=None,
walltime='0:30:00',
**kwargs):

super(SGECluster, self).__init__(**kwargs)

header_lines = ['#!/bin/bash']

if self.name is not None:
header_lines.append('#$ -N %(name)s')
if queue is not None:
header_lines.append('#$ -q %(queue)s')
if project is not None:
header_lines.append('#$ -P %(project)s')
if resource_spec is not None:
header_lines.append('#$ -l %(resource_spec)s')
if walltime is not None:
header_lines.append('#$ -l h_rt=%(walltime)s')
header_lines.extend(['#$ -cwd', '#$ -j y'])
header_template = '\n'.join(header_lines)

config = {'name': self.name,
'queue': queue,
'project': project,
'processes': self.worker_processes,
'walltime': walltime,
'resource_spec': resource_spec,}
self.job_header = header_template % config

logger.debug("Job script: \n %s" % self.job_script())
31 changes: 28 additions & 3 deletions dask_jobqueue/tests/test_sge.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,34 @@
from time import time, sleep

import pytest

from dask.distributed import Client
from distributed.utils_test import loop # noqa: F401

from dask_jobqueue import SGECluster

pytestmark = pytest.mark.env("sge")


def test_sge_placeholder():
# to test that CI is working
pass
def test_basic(loop): # noqa: F811
with SGECluster(walltime='00:02:00', threads=2, memory='7GB',
loop=loop) as cluster:
with Client(cluster, loop=loop) as client:
workers = cluster.start_workers(2)
future = client.submit(lambda x: x + 1, 10)
assert future.result(60) == 11
assert cluster.jobs

info = client.scheduler_info()
w = list(info['workers'].values())[0]
assert w['memory_limit'] == 7e9
assert w['ncores'] == 2

cluster.stop_workers(workers)

start = time()
while len(client.scheduler_info()['workers']) > 0:
sleep(0.100)
assert time() < start + 10

assert not cluster.jobs