Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Deploy Dask on Job Queueing systems
===================================

This helps to deploy Dask on batch-style job schedulers like PBS and SLURM.
This helps to deploy Dask on batch-style job schedulers like PBS, SGE and SLURM.

Example
-------
Expand Down Expand Up @@ -40,4 +40,4 @@ Original developers include the following:

- `Jim Edwards <https://github.com/jedwards4b>`_
- `Joe Hamman <https://github.com/jhamman>`_
- `Matthew ROcklin <https://github.com/mrocklin>`_
- `Matthew Rocklin <https://github.com/mrocklin>`_
8 changes: 6 additions & 2 deletions dask_jobqueue/jobqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class JobQueueCluster(object):
PBSCluster
SLURMCluster
"""

self.submitcmd = 'qsub'
self.cancelcmd = 'qdel'

def __init__(self):
raise NotImplemented

Expand All @@ -39,7 +43,7 @@ def start_workers(self, n=1):
workers = []
for _ in range(n):
with self.job_file() as fn:
out = self._call([self._submitcmd, fn])
out = self._call([self.submitcmd, fn])
job = out.decode().split('.')[0]
self.jobs[self.n] = job
workers.append(self.n)
Expand Down Expand Up @@ -99,7 +103,7 @@ def stop_workers(self, workers):
return
workers = list(map(int, workers))
jobs = [self.jobs[w] for w in workers]
self._call([self._cancelcmd] + list(jobs))
self._call([self.cancelcmd] + list(jobs))
for w in workers:
with ignoring(KeyError):
del self.jobs[w]
Expand Down
2 changes: 0 additions & 2 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,5 @@ def __init__(self,
self.jobs = dict()
self.n = 0
self._adaptive = None
self._submitcmd = 'qsub'
self._cancelcmd = 'qdel'

logger.debug("Job script: \n %s" % self.job_script())
124 changes: 124 additions & 0 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import logging
import os
import socket
import sys

from distributed import LocalCluster
from distributed.utils import get_ip_interface

from .core import JobQueueCluster

logger = logging.getLogger(__name__)

dirname = os.path.dirname(sys.executable)


class SGECluster(JobQueueCluster):
""" Launch Dask on a SGE cluster

Parameters
----------
name : str
Name of worker jobs. Passed to `$SGE -N` option.
queue : str
Destination queue for each worker job. Passed to `#$ -q` option.
project : str
Accounting string associated with each worker job. Passed to
`#$ -A` option.
threads_per_worker : int
Number of threads per process.
processes : int
Number of processes per node.
memory : str
Bytes of memory that the worker can use. This should be a string
like "7GB" that can be interpretted both by SGE and Dask.
resource_spec : str
Request resources and specify job placement. Passed to `#$ -l`
option.
walltime : str
Walltime for each worker job.
interface : str
Network interface like 'eth0' or 'ib0'.
death_timeout : float
Seconds to wait for a scheduler before closing workers
extra : str
Additional arguments to pass to `dask-worker`
kwargs : dict
Additional keyword arguments to pass to `LocalCluster`

Examples
--------
>>> from dask_jobqueue import SGECluster
>>> cluster = SGECluster(project='...')
>>> cluster.start_workers(10) # this may take a few seconds to launch

>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and
kill workers based on load.

>>> cluster.adapt()
"""
def __init__(self,
name='dask',
queue='default.q',
project=None,
resource_spec='h_vmem=36GB',
threads_per_worker=4,
processes=9,
memory='7GB',
walltime='0:30:0',
interface=None,
death_timeout=60,
extra='',
**kwargs):
self._template = """
#!/bin/bash

#$ -N %(name)s
#$ -q %(queue)s
#$ -P %(project)s
#$ -l %(resource_spec)s
#$ -l h_rt=%(walltime)s
#$ -cwd
#$ -j y

%(base_path)s/dask-worker %(scheduler)s \
--nthreads %(threads_per_worker)d \
--nprocs %(processes)s \
--memory-limit %(memory)s \
--name %(name)s-%(n)d \
--death-timeout %(death_timeout)s \
%(extra)s
""".lstrip()

if interface:
host = get_ip_interface(interface)
extra += ' --interface %s ' % interface
else:
host = socket.gethostname()

project = project or os.environ.get('SGE_ACCOUNT')
if not project:
raise ValueError("Must specify a project like `project='UCLB1234' "
"or set SGE_ACCOUNT environment variable")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on #7 ?

self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs)
memory = memory.replace(' ', '')
self.config = {'name': name,
'queue': queue,
'project': project,
'threads_per_worker': threads_per_worker,
'processes': processes,
'walltime': walltime,
'scheduler': self.scheduler.address,
'resource_spec': resource_spec,
'base_path': dirname,
'memory': memory,
'death_timeout': death_timeout,
'extra': extra}
self.jobs = dict()
self.n = 0
self._adaptive = None

logger.debug("Job script: \n %s" % self.job_script())
4 changes: 2 additions & 2 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def __init__(self,
self.jobs = dict()
self.n = 0
self._adaptive = None
self._submitcmd = 'sbatch'
self._cancelcmd = 'scancel'
self.submitcmd = 'sbatch'
self.cancelcmd = 'scancel'

logger.debug("Job script: \n %s" % self.job_script())
67 changes: 67 additions & 0 deletions dask_jobqueue/tests/test_sge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
from time import time, sleep

import pytest

from dask.distributed import Client
from distributed.utils_test import loop # noqa: F401
from pangeo import SGECluster


def test_basic(loop):
with SGECluster(walltime='00:02:00', threads_per_worker=2, memory='7GB',
interface='ib0', loop=loop) as cluster:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface keyword option here will likely fail on a CI system.

with Client(cluster) as client:
workers = cluster.start_workers(2)
future = client.submit(lambda x: x + 1, 10)
assert future.result(60) == 11
assert cluster.jobs

info = client.scheduler_info()
w = list(info['workers'].values())[0]
assert w['memory_limit'] == 7e9
assert w['ncores'] == 2

cluster.stop_workers(workers)

start = time()
while len(client.scheduler_info()['workers']) > 0:
sleep(0.100)
assert time() < start + 10

assert not cluster.jobs


def test_adaptive(loop):
with SGECluster(walltime='00:02:00', loop=loop) as cluster:
cluster.adapt()
with Client(cluster) as client:
future = client.submit(lambda x: x + 1, 10)
assert future.result(60) == 11

assert cluster.jobs

start = time()
while len(client.scheduler_info()['workers']) != cluster.config['processes']:
sleep(0.1)
assert time() < start + 10

del future

start = time()
while len(client.scheduler_info()['workers']) > 0:
sleep(0.100)
assert time() < start + 10

start = time()
while cluster.jobs:
sleep(0.100)
assert time() < start + 10


@pytest.mark.skipif('SGE_ACCOUNT' in os.environ, reason='SGE_ACCOUNT defined')
def test_errors(loop):
with pytest.raises(ValueError) as info:
SGECluster()

assert 'project=' in str(info.value)