diff --git a/ci/none.sh b/ci/none.sh index 1c67925e..fb1f9647 100644 --- a/ci/none.sh +++ b/ci/none.sh @@ -1,7 +1,5 @@ #!/usr/bin/env bash -set -x - function jobqueue_before_install { # Install miniconda ./ci/conda_setup.sh diff --git a/ci/pbs.sh b/ci/pbs.sh index 715c45d1..d35465ab 100644 --- a/ci/pbs.sh +++ b/ci/pbs.sh @@ -1,7 +1,5 @@ #!/usr/bin/env bash -set -x - function jobqueue_before_install { docker version docker-compose version @@ -17,7 +15,7 @@ function jobqueue_before_install { } function jobqueue_install { - docker exec -it pbs_master /bin/bash -c "cd /dask-jobqueue; python setup.py install" + docker exec -it pbs_master /bin/bash -c "cd /dask-jobqueue; pip install -e ." } function jobqueue_script { diff --git a/ci/sge.sh b/ci/sge.sh index c4ed8a54..50cf5518 100644 --- a/ci/sge.sh +++ b/ci/sge.sh @@ -1,7 +1,5 @@ #!/usr/bin/env bash -set -x - function jobqueue_before_install { docker version docker-compose version @@ -16,7 +14,7 @@ function jobqueue_before_install { } function jobqueue_install { - docker exec -it sge_master /bin/bash -c "cd /dask-jobqueue; python setup.py install" + docker exec -it sge_master /bin/bash -c "cd /dask-jobqueue; pip install -e ." } function jobqueue_script { diff --git a/ci/slurm.sh b/ci/slurm.sh index d61a89b4..cdc88852 100644 --- a/ci/slurm.sh +++ b/ci/slurm.sh @@ -1,7 +1,5 @@ #!/usr/bin/env bash -set -x - function jobqueue_before_install { docker version docker-compose version @@ -16,7 +14,7 @@ function jobqueue_before_install { } function jobqueue_install { - docker exec -it slurmctld /bin/bash -c "cd /dask-jobqueue; python setup.py install" + docker exec -it slurmctld /bin/bash -c "cd /dask-jobqueue; pip install -e ." } function jobqueue_script { diff --git a/dask_jobqueue/config.py b/dask_jobqueue/config.py index 7ab63bb3..f27b2a04 100644 --- a/dask_jobqueue/config.py +++ b/dask_jobqueue/config.py @@ -1,11 +1,10 @@ -from __future__ import print_function, division, absolute_import +from __future__ import absolute_import, division, print_function import os import dask import yaml - fn = os.path.join(os.path.dirname(__file__), 'jobqueue.yaml') dask.config.ensure_file(source=fn) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index ab4b6024..e24f2bfd 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -1,17 +1,22 @@ -from contextlib import contextmanager +from __future__ import absolute_import, division, print_function + import logging +import math import shlex import socket import subprocess import sys import warnings +from collections import OrderedDict +from contextlib import contextmanager import dask import docrep from distributed import LocalCluster from distributed.deploy import Cluster -from distributed.utils import (get_ip_interface, ignoring, parse_bytes, tmpfile, - format_bytes) +from distributed.diagnostics.plugin import SchedulerPlugin +from distributed.utils import ( + format_bytes, get_ip_interface, parse_bytes, tmpfile) logger = logging.getLogger(__name__) docstrings = docrep.DocstringProcessor() @@ -28,6 +33,54 @@ """.strip() +def _job_id_from_worker_name(name): + ''' utility to parse the job ID from the worker name + + template: 'prefix--jobid--suffix' + ''' + _, job_id, _ = name.split('--') + return job_id + + +class JobQueuePlugin(SchedulerPlugin): + def __init__(self): + self.pending_jobs = OrderedDict() + self.running_jobs = OrderedDict() + self.finished_jobs = OrderedDict() + self.all_workers = {} + + def add_worker(self, scheduler, worker=None, name=None, **kwargs): + ''' Run when a new worker enters the cluster''' + logger.debug("adding worker %s" % worker) + w = scheduler.workers[worker] + job_id = _job_id_from_worker_name(w.name) + logger.debug("job id for new worker: %s" % job_id) + self.all_workers[worker] = (w.name, job_id) + + # if this is the first worker for this job, move job to running + if job_id not in self.running_jobs: + logger.debug("this is a new job") + self.running_jobs[job_id] = self.pending_jobs.pop(job_id) + + # add worker to dict of workers in this job + self.running_jobs[job_id][w.name] = w + + def remove_worker(self, scheduler=None, worker=None, **kwargs): + ''' Run when a worker leaves the cluster''' + logger.debug("removing worker %s" % worker) + name, job_id = self.all_workers[worker] + logger.debug("removing worker name (%s) and" + "job_id (%s)" % (name, job_id)) + + # remove worker from this job + del self.running_jobs[job_id][name] + + # once there are no more workers, move this job to finished_jobs + if not self.running_jobs[job_id]: + logger.debug("that was the last worker for job %s" % job_id) + self.finished_jobs[job_id] = self.running_jobs.pop(job_id) + + @docstrings.get_sectionsf('JobQueueCluster') class JobQueueCluster(Cluster): """ Base class to launch Dask Clusters for Job queues @@ -87,6 +140,8 @@ class JobQueueCluster(Cluster): submit_command = None cancel_command = None scheduler_name = '' + _adaptive_options = { + 'worker_key': lambda ws: _job_id_from_worker_name(ws.name)} def __init__(self, name=None, @@ -155,15 +210,17 @@ def __init__(self, self.local_cluster = LocalCluster(n_workers=0, ip=host, **kwargs) - # Keep information on process, cores, and memory, for use in subclasses - self.worker_memory = parse_bytes(memory) - + # Keep information on process, threads and memory, for use in + # subclasses + self.worker_memory = parse_bytes(memory) if memory is not None else None self.worker_processes = processes self.worker_cores = cores self.name = name - self.jobs = dict() - self.n = 0 + # plugin for tracking job status + self._scheduler_plugin = JobQueuePlugin() + self.local_cluster.scheduler.add_plugin(self._scheduler_plugin) + self._adaptive = None self._env_header = '\n'.join(env_extra) @@ -179,10 +236,8 @@ def __init__(self, mem = format_bytes(self.worker_memory / self.worker_processes) mem = mem.replace(' ', '') self._command_template += " --memory-limit %s" % mem + self._command_template += " --name %s--${JOB_ID}--" % name - if name is not None: - self._command_template += " --name %s" % name - self._command_template += "-%(n)d" # Keep %(n) to be replaced later if death_timeout is not None: self._command_template += " --death-timeout %s" % death_timeout if local_directory is not None: @@ -190,36 +245,51 @@ def __init__(self, if extra is not None: self._command_template += extra + @property + def pending_jobs(self): + """ Jobs pending in the queue """ + return self._scheduler_plugin.pending_jobs + + @property + def running_jobs(self): + """ Jobs with currenly active workers """ + return self._scheduler_plugin.running_jobs + + @property + def finished_jobs(self): + """ Jobs that have finished """ + return self._scheduler_plugin.finished_jobs + @property def worker_threads(self): return int(self.worker_cores / self.worker_processes) def job_script(self): """ Construct a job submission script """ - self.n += 1 - template = self._command_template % {'n': self.n} - return self._script_template % {'job_header': self.job_header, - 'env_header': self._env_header, - 'worker_command': template} + pieces = {'job_header': self.job_header, + 'env_header': self._env_header, + 'worker_command': self._command_template} + return self._script_template % pieces @contextmanager def job_file(self): """ Write job submission script to temporary file """ with tmpfile(extension='sh') as fn: with open(fn, 'w') as f: + logger.debug("writing job script: \n%s" % self.job_script()) f.write(self.job_script()) yield fn def start_workers(self, n=1): """ Start workers and point them to our local scheduler """ - workers = [] - for _ in range(n): + logger.debug('starting %s workers' % n) + num_jobs = math.ceil(n / self.worker_processes) + for _ in range(num_jobs): with self.job_file() as fn: out = self._call(shlex.split(self.submit_command) + [fn]) job = self._job_id_from_submit_output(out.decode()) - self.jobs[self.n] = job - workers.append(self.n) - return workers + logger.debug("started job: %s" % job) + self.pending_jobs[job] = {} @property def scheduler(self): @@ -248,12 +318,12 @@ def _calls(self, cmds): Also logs any stderr information """ logger.debug("Submitting the following calls to command line") + procs = [] for cmd in cmds: logger.debug(' '.join(cmd)) - procs = [subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - for cmd in cmds] + procs.append(subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE)) result = [] for proc in procs: @@ -269,33 +339,60 @@ def _call(self, cmd): def stop_workers(self, workers): """ Stop a list of workers""" + logger.debug("Stopping workers: %s" % workers) if not workers: return - workers = list(map(int, workers)) - jobs = [self.jobs[w] for w in workers] - self._call([self.cancel_command] + list(jobs)) + jobs = self._stop_pending_jobs() # stop pending jobs too for w in workers: - with ignoring(KeyError): - del self.jobs[w] + if isinstance(w, dict): + jobs.append(_job_id_from_worker_name(w['name'])) + else: + jobs.append(_job_id_from_worker_name(w.name)) + self.stop_jobs(set(jobs)) + + def stop_jobs(self, jobs): + """ Stop a list of jobs""" + logger.debug("Stopping jobs: %s" % jobs) + if jobs: + jobs = list(jobs) + self._call([self.cancel_command] + list(set(jobs))) def scale_up(self, n, **kwargs): """ Brings total worker count up to ``n`` """ - return self.start_workers(n - len(self.jobs)) + logger.debug("Scaling up to %d workers." % n) + active_and_pending = sum([len(j) for j in self.running_jobs.values()]) + active_and_pending += self.worker_processes * len(self.pending_jobs) + logger.debug("Found %d active/pending workers." % active_and_pending) + self.start_workers(n - active_and_pending) def scale_down(self, workers): ''' Close the workers with the given addresses ''' - if isinstance(workers, dict): - names = {v['name'] for v in workers.values()} - job_ids = {name.split('-')[-2] for name in names} - self.stop_workers(job_ids) + logger.debug("Scaling down. Workers: %s" % workers) + worker_states = [] + for w in workers: + try: + # Get the actual WorkerState + worker_states.append(self.scheduler.workers[w]) + except KeyError: + logger.debug('worker %s is already gone' % w) + self.stop_workers(worker_states) def __enter__(self): return self def __exit__(self, type, value, traceback): - self.stop_workers(self.jobs) + jobs = self._stop_pending_jobs() + jobs += list(self.running_jobs.keys()) + self.stop_jobs(set(jobs)) self.local_cluster.__exit__(type, value, traceback) + def _stop_pending_jobs(self): + jobs = list(self.pending_jobs.keys()) + logger.debug("Stopping pending jobs %s" % jobs) + for job_id in jobs: + del self.pending_jobs[job_id] + return jobs + def _job_id_from_submit_output(self, out): raise NotImplementedError('_job_id_from_submit_output must be ' 'implemented when JobQueueCluster is ' diff --git a/dask_jobqueue/moab.py b/dask_jobqueue/moab.py index 7fbc0bc6..36e06595 100644 --- a/dask_jobqueue/moab.py +++ b/dask_jobqueue/moab.py @@ -30,7 +30,7 @@ class MoabCluster(PBSCluster): memory='16G', resource_spec='96G', job_extra=['-d /home/First.Last', '-M none'], local_directory=os.getenv('TMPDIR', '/tmp')) - >>> cluster.start_workers(10) # this may take a few seconds to launch + >>> cluster.start_workers(10) # submit enough jobs to deploy 10 workers >>> from dask.distributed import Client >>> client = Client(cluster) diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 6545b8dc..a7f25730 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -75,7 +75,10 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, # Instantiate args and parameters from parent abstract class super(PBSCluster, self).__init__(**kwargs) - header_lines = [] + # Try to find a project name from environment variable + project = project or os.environ.get('PBS_ACCOUNT') + + header_lines = ['#!/usr/bin/env bash'] # PBS header build if self.name is not None: header_lines.append('#PBS -N %s' % self.name) @@ -95,6 +98,7 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, if walltime is not None: header_lines.append('#PBS -l walltime=%s' % walltime) header_lines.extend(['#PBS %s' % arg for arg in job_extra]) + header_lines.append('JOB_ID=${PBS_JOBID%.*}') # Declare class attribute that shall be overriden self.job_header = '\n'.join(header_lines) diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index a7492f3f..1c49dfe7 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import, division, print_function + import logging import dask @@ -56,8 +58,7 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, super(SGECluster, self).__init__(**kwargs) - header_lines = ['#!/bin/bash'] - + header_lines = ['#!/usr/bin/env bash'] if self.name is not None: header_lines.append('#$ -N %(name)s') if queue is not None: diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 00291b0b..6ffbe79a 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -51,7 +51,7 @@ class SLURMCluster(JobQueueCluster): >>> cluster.adapt() """, 4) - #Override class variables + # Override class variables submit_command = 'sbatch --parsable' cancel_command = 'scancel' scheduler_name = 'slurm' @@ -74,7 +74,7 @@ def __init__(self, queue=None, project=None, walltime=None, super(SLURMCluster, self).__init__(**kwargs) # Always ask for only one task - header_lines = [] + header_lines = ['#!/usr/bin/env bash'] # SLURM header build if self.name is not None: header_lines.append('#SBATCH -J %s' % self.name) @@ -100,6 +100,8 @@ def __init__(self, queue=None, project=None, walltime=None, header_lines.append('#SBATCH -t %s' % walltime) header_lines.extend(['#SBATCH %s' % arg for arg in job_extra]) + header_lines.append('JOB_ID=${SLURM_JOB_ID%;*}') + # Declare class attribute that shall be overriden self.job_header = '\n'.join(header_lines) diff --git a/dask_jobqueue/tests/__init__.py b/dask_jobqueue/tests/__init__.py new file mode 100644 index 00000000..575ccfb2 --- /dev/null +++ b/dask_jobqueue/tests/__init__.py @@ -0,0 +1,2 @@ + +QUEUE_WAIT = 60 # seconds diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index aa1701e7..4374fba3 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -3,11 +3,6 @@ from dask_jobqueue import JobQueueCluster -def test_jq_core_placeholder(): - # to test that CI is working - pass - - def test_errors(): with pytest.raises(NotImplementedError) as info: JobQueueCluster(cores=4) diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 57a92345..d9a2f08b 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -2,16 +2,19 @@ from time import sleep, time import dask +import pytest from dask.distributed import Client from distributed.utils_test import loop # noqa: F401 -import pytest -from dask_jobqueue import PBSCluster, MoabCluster +from dask_jobqueue import MoabCluster, PBSCluster + +from . import QUEUE_WAIT @pytest.mark.parametrize('Cluster', [PBSCluster, MoabCluster]) def test_header(Cluster): - with Cluster(walltime='00:02:00', processes=4, cores=8, memory='28GB') as cluster: + with Cluster(walltime='00:02:00', processes=4, cores=8, memory='28GB', + name='dask-worker') as cluster: assert '#PBS' in cluster.job_header assert '#PBS -N dask-worker' in cluster.job_header @@ -19,6 +22,7 @@ def test_header(Cluster): assert '#PBS -l walltime=00:02:00' in cluster.job_header assert '#PBS -q' not in cluster.job_header assert '#PBS -A' not in cluster.job_header + assert '--name dask-worker--${JOB_ID}--' in cluster.job_script() with Cluster(queue='regular', project='DaskOnPBS', processes=4, cores=8, memory='28GB', resource_spec='select=1:ncpus=24:mem=100GB') as cluster: @@ -84,24 +88,25 @@ def test_basic(loop): with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp', job_extra=['-V'], loop=loop) as cluster: with Client(cluster) as client: - workers = cluster.start_workers(2) + cluster.start_workers(2) + assert cluster.pending_jobs or cluster.running_jobs future = client.submit(lambda x: x + 1, 10) - assert future.result(60) == 11 - assert cluster.jobs + assert future.result(QUEUE_WAIT) == 11 + assert cluster.running_jobs - info = client.scheduler_info() - w = list(info['workers'].values())[0] + workers = list(client.scheduler_info()['workers'].values()) + w = workers[0] assert w['memory_limit'] == 2e9 assert w['ncores'] == 2 cluster.stop_workers(workers) start = time() - while len(client.scheduler_info()['workers']) > 0: + while client.scheduler_info()['workers']: sleep(0.100) - assert time() < start + 10 + assert time() < start + QUEUE_WAIT - assert not cluster.jobs + assert not cluster.running_jobs @pytest.mark.env("pbs") # noqa: F811 @@ -111,29 +116,59 @@ def test_adaptive(loop): cluster.adapt() with Client(cluster) as client: future = client.submit(lambda x: x + 1, 10) - assert future.result(60) == 11 - assert cluster.jobs + start = time() + while not (cluster.pending_jobs or cluster.running_jobs): + sleep(0.100) + assert time() < start + QUEUE_WAIT + + assert future.result(QUEUE_WAIT) == 11 start = time() processes = cluster.worker_processes while len(client.scheduler_info()['workers']) != processes: sleep(0.1) - assert time() < start + 10 + assert time() < start + QUEUE_WAIT del future start = time() while len(client.scheduler_info()['workers']) > 0: sleep(0.100) - assert time() < start + 10 - - # There is probably a bug to fix in the adaptive methods of the JobQueueCluster - # Currently cluster.jobs is not cleaned up. - #start = time() - #while cluster.jobs: - # sleep(0.100) - # assert time() < start + 10 + assert time() < start + QUEUE_WAIT + + start = time() + while cluster.pending_jobs or cluster.running_jobs: + sleep(0.100) + assert time() < start + QUEUE_WAIT + assert cluster.finished_jobs + + +@pytest.mark.env("pbs") # noqa: F811 +def test_adaptive_grouped(loop): + with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', + local_directory='/tmp', job_extra=['-V'], + loop=loop) as cluster: + cluster.adapt(minimum=1) # at least 1 worker + with Client(cluster) as client: + start = time() + while not (cluster.pending_jobs or cluster.running_jobs): + sleep(0.100) + assert time() < start + QUEUE_WAIT + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + + start = time() + while not cluster.running_jobs: + sleep(0.100) + assert time() < start + QUEUE_WAIT + + start = time() + processes = cluster.worker_processes + while len(client.scheduler_info()['workers']) != processes: + sleep(0.1) + assert time() < start + QUEUE_WAIT def test_config(loop): # noqa: F811 diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 0afc71b8..ffdd6760 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -6,28 +6,32 @@ from dask_jobqueue import SGECluster -pytestmark = pytest.mark.env("sge") +from . import QUEUE_WAIT +@pytest.mark.env("sge") # noqa: F811 def test_basic(loop): # noqa: F811 - with SGECluster(walltime='00:02:00', cores=8, processes=4, memory='28GB', + with SGECluster(walltime='00:02:00', cores=8, processes=4, memory='2GB', loop=loop) as cluster: + print(cluster.job_script()) with Client(cluster, loop=loop) as client: - workers = cluster.start_workers(2) + cluster.start_workers(2) + assert cluster.pending_jobs or cluster.running_jobs + future = client.submit(lambda x: x + 1, 10) - assert future.result(60) == 11 - assert cluster.jobs + assert future.result(QUEUE_WAIT) == 11 + assert cluster.running_jobs - info = client.scheduler_info() - for w in info['workers'].values(): - assert w['memory_limit'] == 7e9 - assert w['ncores'] == 2 + workers = list(client.scheduler_info()['workers'].values()) + w = workers[0] + assert w['memory_limit'] == 2e9 / 4 + assert w['ncores'] == 2 cluster.stop_workers(workers) start = time() - while len(client.scheduler_info()['workers']) > 0: + while client.scheduler_info()['workers']: sleep(0.100) - assert time() < start + 10 + assert time() < start + QUEUE_WAIT - assert not cluster.jobs + assert not cluster.running_jobs diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 2f30db13..dc31d821 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -1,12 +1,14 @@ +import sys from time import sleep, time import pytest -import sys from distributed import Client from distributed.utils_test import loop # noqa: F401 from dask_jobqueue import SLURMCluster +from . import QUEUE_WAIT + def test_header(): with SLURMCluster(walltime='00:02:00', processes=4, cores=8, memory='28GB') as cluster: @@ -20,14 +22,14 @@ def test_header(): assert '#SBATCH -p' not in cluster.job_header assert '#SBATCH -A' not in cluster.job_header - with SLURMCluster(queue='regular', project='DaskOnPBS', processes=4, + with SLURMCluster(queue='regular', project='DaskOnSlurm', processes=4, cores=8, memory='28GB', job_cpu=16, job_mem='100G') as cluster: assert '#SBATCH --cpus-per-task=16' in cluster.job_header assert '#SBATCH --cpus-per-task=8' not in cluster.job_header assert '#SBATCH --mem=100G' in cluster.job_header assert '#SBATCH -t ' in cluster.job_header - assert '#SBATCH -A DaskOnPBS' in cluster.job_header + assert '#SBATCH -A DaskOnSlurm' in cluster.job_header assert '#SBATCH -p regular' in cluster.job_header with SLURMCluster(cores=4, memory='8GB') as cluster: @@ -86,56 +88,60 @@ def test_job_script(): @pytest.mark.env("slurm") # noqa: F811 def test_basic(loop): - with SLURMCluster(walltime='00:02:00', cores=2, processes=1, memory='4GB', + with SLURMCluster(walltime='00:02:00', cores=2, processes=1, memory='2GB', job_extra=['-D /'], loop=loop) as cluster: with Client(cluster) as client: - workers = cluster.start_workers(2) + cluster.start_workers(2) + assert cluster.pending_jobs or cluster.running_jobs future = client.submit(lambda x: x + 1, 10) - assert future.result(60) == 11 - assert cluster.jobs + assert future.result(QUEUE_WAIT) == 11 + assert cluster.running_jobs - info = client.scheduler_info() - w = list(info['workers'].values())[0] - assert w['memory_limit'] == 4e9 + workers = list(client.scheduler_info()['workers'].values()) + w = workers[0] + assert w['memory_limit'] == 2e9 assert w['ncores'] == 2 cluster.stop_workers(workers) start = time() - while len(client.scheduler_info()['workers']) > 0: + while client.scheduler_info()['workers']: sleep(0.100) - assert time() < start + 10 + assert time() < start + QUEUE_WAIT - assert not cluster.jobs + assert not cluster.running_jobs @pytest.mark.env("slurm") # noqa: F811 def test_adaptive(loop): - with SLURMCluster(walltime='00:02:00', cores=2, processes=1, memory='4GB', + with SLURMCluster(walltime='00:02:00', cores=2, processes=1, memory='2GB', job_extra=['-D /'], loop=loop) as cluster: cluster.adapt() with Client(cluster) as client: future = client.submit(lambda x: x + 1, 10) - assert future.result(60) == 11 - assert cluster.jobs + start = time() + while not (cluster.pending_jobs or cluster.running_jobs): + sleep(0.100) + assert time() < start + QUEUE_WAIT + + assert future.result(QUEUE_WAIT) == 11 start = time() processes = cluster.worker_processes - while (len(client.scheduler_info()['workers']) != processes): + while len(client.scheduler_info()['workers']) != processes: sleep(0.1) - assert time() < start + 10 + assert time() < start + QUEUE_WAIT del future start = time() while len(client.scheduler_info()['workers']) > 0: sleep(0.100) - assert time() < start + 10 - - # There is probably a bug to fix in the adaptive methods of the JobQueueCluster - # Currently cluster.jobs is not cleaned up. - # start = time() - # while cluster.jobs: - # sleep(0.100) - # assert time() < start + 10 + assert time() < start + QUEUE_WAIT + + start = time() + while cluster.pending_jobs or cluster.running_jobs: + sleep(0.100) + assert time() < start + QUEUE_WAIT + assert cluster.finished_jobs diff --git a/docs/conf.py b/docs/conf.py index 33f5c430..4b091427 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -24,9 +24,8 @@ copyright = u'2018, Anaconda, Inc. and contributors' -# The short X.Y version. -# version = '0.1.0' from dask_jobqueue import __version__ as version + # The full version, including alpha/beta/rc tags. # release = '0.1.0' release = version diff --git a/docs/index.rst b/docs/index.rst index 8009bbc0..e585fa64 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -17,7 +17,7 @@ Example from dask_jobqueue import PBSCluster cluster = PBSCluster() - cluster.scale(10) # Ask for ten jobs + cluster.scale(10) # Ask for ten workers from dask.distributed import Client client = Client(cluster) # Connect this local process to remote workers @@ -37,7 +37,7 @@ save resources when not actively computing. .. code-block:: python - cluster.adapt(minimum=1, maximum=100) + cluster.adapt(minimum=6, maximum=90) # auto-scale between 6 and 90 workers Configuration @@ -76,7 +76,7 @@ will specify how many jobs to deploy using the scale method. .. code-block:: python - cluster.scale(20) # launch twenty jobs of the specification provided above + cluster.scale(12) # launch 12 workers (2 jobs of 6 workers each) of the specification provided above Configuration Files ~~~~~~~~~~~~~~~~~~~ @@ -163,7 +163,7 @@ You then ask for more workers using the ``scale`` command: .. code-block:: python - cluster.scale(10) + cluster.scale(36) The cluster generates a traditional job script and submits that an appropriate number of times to the job queue. You can see the job script that it will @@ -201,3 +201,12 @@ When the cluster object goes away, either because you delete it or because you close your Python program, it will send a signal to the workers to shut down. If for some reason this signal does not get through then workers will kill themselves after 60 seconds of waiting for a non-existent scheduler. + +Workers vs Jobs +--------------- + +In dask-distributed, a ``Worker`` is a Python object and node in a dask +``Cluster`` that serves two purposes, 1) serve data, and 2) perform +computations. ``Jobs`` are resources submitted to, and managed by, the job +queueing system (e.g. PBS, SGE, etc.). In dask-jobqueue, a single ``Job`` may +include one or more ``Workers``.