From 86cb12b6b1c1218fad31732743424426c98f6c31 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Thu, 1 Mar 2018 22:11:55 -0800 Subject: [PATCH 1/3] basic sge implementation --- dask_jobqueue/sge.py | 126 ++++++++++++++++++++++++++++++++ dask_jobqueue/tests/test_sge.py | 67 +++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 dask_jobqueue/sge.py create mode 100644 dask_jobqueue/tests/test_sge.py diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py new file mode 100644 index 00000000..54951494 --- /dev/null +++ b/dask_jobqueue/sge.py @@ -0,0 +1,126 @@ +import logging +import os +import socket +import sys + +from distributed import LocalCluster +from distributed.utils import get_ip_interface + +from .core import JobQueueCluster + +logger = logging.getLogger(__name__) + +dirname = os.path.dirname(sys.executable) + + +class SGECluster(JobQueueCluster): + """ Launch Dask on a SGE cluster + + Parameters + ---------- + name : str + Name of worker jobs. Passed to `$SGE -N` option. + queue : str + Destination queue for each worker job. Passed to `#$ -q` option. + project : str + Accounting string associated with each worker job. Passed to + `#$ -A` option. + threads_per_worker : int + Number of threads per process. + processes : int + Number of processes per node. + memory : str + Bytes of memory that the worker can use. This should be a string + like "7GB" that can be interpretted both by SGE and Dask. + resource_spec : str + Request resources and specify job placement. Passed to `#$ -l` + option. + walltime : str + Walltime for each worker job. + interface : str + Network interface like 'eth0' or 'ib0'. + death_timeout : float + Seconds to wait for a scheduler before closing workers + extra : str + Additional arguments to pass to `dask-worker` + kwargs : dict + Additional keyword arguments to pass to `LocalCluster` + + Examples + -------- + >>> from dask_jobqueue import SGECluster + >>> cluster = SGECluster(project='...') + >>> cluster.start_workers(10) # this may take a few seconds to launch + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + This also works with adaptive clusters. This automatically launches and + kill workers based on load. + + >>> cluster.adapt() + """ + def __init__(self, + name='dask', + queue='default.q', + project=None, + resource_spec='h_vmem=36GB', + threads_per_worker=4, + processes=9, + memory='7GB', + walltime='0:30:0', + interface=None, + death_timeout=60, + extra='', + **kwargs): + self._template = """ +#!/bin/bash + +#$ -N %(name)s +#$ -q %(queue)s +#$ -P %(project)s +#$ -l %(resource_spec)s +#$ -l h_rt=%(walltime)s +#$ -cwd +#$ -j y + +%(base_path)s/dask-worker %(scheduler)s \ + --nthreads %(threads_per_worker)d \ + --nprocs %(processes)s \ + --memory-limit %(memory)s \ + --name %(name)s-%(n)d \ + --death-timeout %(death_timeout)s \ + %(extra)s +""".lstrip() + + if interface: + host = get_ip_interface(interface) + extra += ' --interface %s ' % interface + else: + host = socket.gethostname() + + project = project or os.environ.get('SGE_ACCOUNT') + if not project: + raise ValueError("Must specify a project like `project='UCLB1234' " + "or set SGE_ACCOUNT environment variable") + self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) + memory = memory.replace(' ', '') + self.config = {'name': name, + 'queue': queue, + 'project': project, + 'threads_per_worker': threads_per_worker, + 'processes': processes, + 'walltime': walltime, + 'scheduler': self.scheduler.address, + 'resource_spec': resource_spec, + 'base_path': dirname, + 'memory': memory, + 'death_timeout': death_timeout, + 'extra': extra} + self.jobs = dict() + self.n = 0 + self._adaptive = None + self._submitcmd = 'qsub' + self._cancelcmd = 'qdel' + + logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py new file mode 100644 index 00000000..09164840 --- /dev/null +++ b/dask_jobqueue/tests/test_sge.py @@ -0,0 +1,67 @@ +import os +from time import time, sleep + +import pytest + +from dask.distributed import Client +from distributed.utils_test import loop # noqa: F401 +from pangeo import SGECluster + + +def test_basic(loop): + with SGECluster(walltime='00:02:00', threads_per_worker=2, memory='7GB', + interface='ib0', loop=loop) as cluster: + with Client(cluster) as client: + workers = cluster.start_workers(2) + future = client.submit(lambda x: x + 1, 10) + assert future.result(60) == 11 + assert cluster.jobs + + info = client.scheduler_info() + w = list(info['workers'].values())[0] + assert w['memory_limit'] == 7e9 + assert w['ncores'] == 2 + + cluster.stop_workers(workers) + + start = time() + while len(client.scheduler_info()['workers']) > 0: + sleep(0.100) + assert time() < start + 10 + + assert not cluster.jobs + + +def test_adaptive(loop): + with SGECluster(walltime='00:02:00', loop=loop) as cluster: + cluster.adapt() + with Client(cluster) as client: + future = client.submit(lambda x: x + 1, 10) + assert future.result(60) == 11 + + assert cluster.jobs + + start = time() + while len(client.scheduler_info()['workers']) != cluster.config['processes']: + sleep(0.1) + assert time() < start + 10 + + del future + + start = time() + while len(client.scheduler_info()['workers']) > 0: + sleep(0.100) + assert time() < start + 10 + + start = time() + while cluster.jobs: + sleep(0.100) + assert time() < start + 10 + + +@pytest.mark.skipif('SGE_ACCOUNT' in os.environ, reason='SGE_ACCOUNT defined') +def test_errors(loop): + with pytest.raises(ValueError) as info: + SGECluster() + + assert 'project=' in str(info.value) From 0fbbd83ef610af2b31a28f4453aaa37d3f8c19dc Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 6 Mar 2018 11:17:25 -0800 Subject: [PATCH 2/3] submit/cancel commands as class variables --- dask_jobqueue/jobqueue.py | 8 ++++++-- dask_jobqueue/pbs.py | 2 -- dask_jobqueue/sge.py | 2 -- dask_jobqueue/slurm.py | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dask_jobqueue/jobqueue.py b/dask_jobqueue/jobqueue.py index f0ea6dc7..cd82e55d 100644 --- a/dask_jobqueue/jobqueue.py +++ b/dask_jobqueue/jobqueue.py @@ -19,6 +19,10 @@ class JobQueueCluster(object): PBSCluster SLURMCluster """ + + self.submitcmd = 'qsub' + self.cancelcmd = 'qdel' + def __init__(self): raise NotImplemented @@ -39,7 +43,7 @@ def start_workers(self, n=1): workers = [] for _ in range(n): with self.job_file() as fn: - out = self._call([self._submitcmd, fn]) + out = self._call([self.submitcmd, fn]) job = out.decode().split('.')[0] self.jobs[self.n] = job workers.append(self.n) @@ -99,7 +103,7 @@ def stop_workers(self, workers): return workers = list(map(int, workers)) jobs = [self.jobs[w] for w in workers] - self._call([self._cancelcmd] + list(jobs)) + self._call([self.cancelcmd] + list(jobs)) for w in workers: with ignoring(KeyError): del self.jobs[w] diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index cf634cbd..70760518 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -119,7 +119,5 @@ def __init__(self, self.jobs = dict() self.n = 0 self._adaptive = None - self._submitcmd = 'qsub' - self._cancelcmd = 'qdel' logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 54951494..6efb860e 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -120,7 +120,5 @@ def __init__(self, self.jobs = dict() self.n = 0 self._adaptive = None - self._submitcmd = 'qsub' - self._cancelcmd = 'qdel' logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 8828239d..fb84993f 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -122,7 +122,7 @@ def __init__(self, self.jobs = dict() self.n = 0 self._adaptive = None - self._submitcmd = 'sbatch' - self._cancelcmd = 'scancel' + self.submitcmd = 'sbatch' + self.cancelcmd = 'scancel' logger.debug("Job script: \n %s" % self.job_script()) From b1e0c26daa173e052d0e75331d8e4c1dd06f42eb Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 6 Mar 2018 11:18:45 -0800 Subject: [PATCH 3/3] update readme --- README.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index db3aee22..cec0f4a9 100644 --- a/README.rst +++ b/README.rst @@ -1,7 +1,7 @@ Deploy Dask on Job Queueing systems =================================== -This helps to deploy Dask on batch-style job schedulers like PBS and SLURM. +This helps to deploy Dask on batch-style job schedulers like PBS, SGE and SLURM. Example ------- @@ -40,4 +40,4 @@ Original developers include the following: - `Jim Edwards `_ - `Joe Hamman `_ -- `Matthew ROcklin `_ +- `Matthew Rocklin `_