From 4392a0bea874d8659c8bdcf9b438dcbdcd91cdca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 29 Mar 2018 07:56:36 +0200 Subject: [PATCH 01/14] First attempt at SGE implementation * use submit_command="qsub -terse" to make sure only the job id is returned. --- dask_jobqueue/core.py | 5 +- dask_jobqueue/sge.py | 139 ++++++++++++++++++++++++++++++++ dask_jobqueue/tests/test_sge.py | 67 ++++++++++++++- 3 files changed, 206 insertions(+), 5 deletions(-) create mode 100644 dask_jobqueue/sge.py diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 7806c937..08cdca00 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -5,6 +5,7 @@ import os import sys import docrep +import shlex from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes from distributed import LocalCluster @@ -153,8 +154,8 @@ def start_workers(self, n=1): workers = [] for _ in range(n): with self.job_file() as fn: - out = self._call([self.submit_command, fn]) - job = out.decode().split('.')[0] + out = self._call(shlex.split(self.submit_command) + [fn]) + job = out.decode().split('.')[0].strip() self.jobs[self.n] = job workers.append(self.n) return workers diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py new file mode 100644 index 00000000..265cc322 --- /dev/null +++ b/dask_jobqueue/sge.py @@ -0,0 +1,139 @@ +import logging +import os +import socket +import sys + +from distributed import LocalCluster +from distributed.utils import get_ip_interface + +from .core import JobQueueCluster + +logger = logging.getLogger(__name__) + +dirname = os.path.dirname(sys.executable) + + +class SGECluster(JobQueueCluster): + """ Launch Dask on a SGE cluster + + Parameters + ---------- + name : str + Name of worker jobs. Passed to `$SGE -N` option. + queue : str + Destination queue for each worker job. Passed to `#$ -q` option. + project : str + Accounting string associated with each worker job. Passed to + `#$ -A` option. + threads_per_worker : int + Number of threads per process. + processes : int + Number of processes per node. + memory : str + Bytes of memory that the worker can use. This should be a string + like "7GB" that can be interpretted both by SGE and Dask. + resource_spec : str + Request resources and specify job placement. Passed to `#$ -l` + option. + walltime : str + Walltime for each worker job. + interface : str + Network interface like 'eth0' or 'ib0'. + death_timeout : float + Seconds to wait for a scheduler before closing workers + extra : str + Additional arguments to pass to `dask-worker` + kwargs : dict + Additional keyword arguments to pass to `LocalCluster` + + Examples + -------- + >>> from dask_jobqueue import SGECluster + >>> cluster = SGECluster(project='...') + >>> cluster.start_workers(10) # this may take a few seconds to launch + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + This also works with adaptive clusters. This automatically launches and + kill workers based on load. + + >>> cluster.adapt() + """ + + #Override class variables + submit_command = 'qsub -terse' + cancel_command = 'qdel' + + def __init__(self, + queue=None, + project=None, + resource_spec='h_vmem=36G', + walltime='0:30:0', + interface=None, + **kwargs): + + super(SGECluster, self).__init__(**kwargs) + +# self._header_template = """ +# #!/bin/bash + +# #$ -N %(name)s +# #$ -q %(queue)s +# #$ -P %(project)s +# #$ -l %(resource_spec)s +# #$ -l h_rt=%(walltime)s +# #$ -cwd +# #$ -j y + +# """.lstrip() + + # if interface: + # host = get_ip_interface(interface) + # extra += ' --interface %s ' % interface + # else: + # host = socket.gethostname() + + project = project or os.environ.get('SGE_ACCOUNT') + + header_lines = ['#!/bin/bash'] + + # XXX: change the getattr when this is fixed in master + if getattr(self, 'name', None) is not None: + header_lines.append('#$ -N %(name)s') + if queue is not None: + header_lines.append('#$ -q %(queue)s') + if project is not None: + header_lines.append('#$ -P %(project)s') + if resource_spec is not None: + # TODO clever default like in pbs??? + header_lines.append('#$ -l %(resource_spec)s') + if walltime is not None: + header_lines.append('#$ -l h_rt=%(walltime)s') + header_lines.extend(['#$ -cwd', '#$ -j y']) + self._header_template = '\n'.join(header_lines) + + # if not project: + # raise ValueError("Must specify a project like `project='UCLB1234' " + # "or set SGE_ACCOUNT environment variable") + # self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) + # memory = memory.replace(' ', '') + self.config = {'name': getattr(self, 'name', 'default-name'), + 'queue': queue, + 'project': project, + # 'threads_per_worker': threads_per_worker, + 'processes': self.worker_processes, + 'walltime': walltime, + # 'scheduler': self.scheduler.address, + 'resource_spec': resource_spec, + # 'base_path': dirname, + # 'memory': memory, + # 'death_timeout': death_timeout, + # 'extra': extra + } + self.job_header = self._header_template % self.config + # self.jobs = dict() + # self.n = 0 + # self._adaptive = None + + logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 15e984c8..52fab8bc 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -1,9 +1,70 @@ +import os +from time import time, sleep import pytest +from dask.distributed import Client +from distributed.utils_test import loop # noqa: F401 + +from dask_jobqueue.sge import SGECluster + pytestmark = pytest.mark.env("sge") -def test_sge_placeholder(): - # to test that CI is working - pass +def test_basic(loop): # noqa: F811 + with SGECluster(walltime='00:02:00', threads=2, memory='7GB', + loop=loop) as cluster: + with Client(cluster) as client: + workers = cluster.start_workers(2) + future = client.submit(lambda x: x + 1, 10) + assert future.result(60) == 11 + assert cluster.jobs + + info = client.scheduler_info() + w = list(info['workers'].values())[0] + assert w['memory_limit'] == 7e9 + assert w['ncores'] == 2 + + cluster.stop_workers(workers) + + start = time() + while len(client.scheduler_info()['workers']) > 0: + sleep(0.100) + assert time() < start + 10 + + assert not cluster.jobs + + +def test_adaptive(loop): # noqa: F811 + with SGECluster(walltime='00:02:00', loop=loop) as cluster: + cluster.adapt() + with Client(cluster) as client: + future = client.submit(lambda x: x + 1, 10) + assert future.result(60) == 11 + + assert cluster.jobs + + start = time() + while len(client.scheduler_info()['workers']) != cluster.config['processes']: + sleep(0.1) + assert time() < start + 10 + + del future + + start = time() + while len(client.scheduler_info()['workers']) > 0: + sleep(0.100) + assert time() < start + 10 + + start = time() + while cluster.jobs: + sleep(0.100) + assert time() < start + 10 + + +@pytest.mark.skipif('SGE_ACCOUNT' in os.environ, reason='SGE_ACCOUNT defined') # noqa: F811 +def test_errors(loop): + with pytest.raises(ValueError) as info: + SGECluster() + + assert 'project=' in str(info.value) From c83c29dc3da2c4b24a5f454be187b5b76076ca36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Wed, 11 Apr 2018 16:43:47 +0200 Subject: [PATCH 02/14] Leave test_adaptive for later. Remove test_errors. PBS_ACCOUNT does not have an equivalent in SGE. --- dask_jobqueue/tests/test_sge.py | 35 --------------------------------- 1 file changed, 35 deletions(-) diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 52fab8bc..e7238396 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -33,38 +33,3 @@ def test_basic(loop): # noqa: F811 assert time() < start + 10 assert not cluster.jobs - - -def test_adaptive(loop): # noqa: F811 - with SGECluster(walltime='00:02:00', loop=loop) as cluster: - cluster.adapt() - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - assert future.result(60) == 11 - - assert cluster.jobs - - start = time() - while len(client.scheduler_info()['workers']) != cluster.config['processes']: - sleep(0.1) - assert time() < start + 10 - - del future - - start = time() - while len(client.scheduler_info()['workers']) > 0: - sleep(0.100) - assert time() < start + 10 - - start = time() - while cluster.jobs: - sleep(0.100) - assert time() < start + 10 - - -@pytest.mark.skipif('SGE_ACCOUNT' in os.environ, reason='SGE_ACCOUNT defined') # noqa: F811 -def test_errors(loop): - with pytest.raises(ValueError) as info: - SGECluster() - - assert 'project=' in str(info.value) From 27032ece72ee60b6847b42e29c4e9b0941c93551 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Wed, 11 Apr 2018 16:45:04 +0200 Subject: [PATCH 03/14] Need to specify loop in the Client --- dask_jobqueue/tests/test_sge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index e7238396..da5c67d0 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -14,7 +14,7 @@ def test_basic(loop): # noqa: F811 with SGECluster(walltime='00:02:00', threads=2, memory='7GB', loop=loop) as cluster: - with Client(cluster) as client: + with Client(cluster, loop=loop) as client: workers = cluster.start_workers(2) future = client.submit(lambda x: x + 1, 10) assert future.result(60) == 11 From a7b26c8c78f2cb97a42d9af32fa3d29e4c710dcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Wed, 11 Apr 2018 16:45:22 +0200 Subject: [PATCH 04/14] Clean up in sge.py --- dask_jobqueue/sge.py | 34 +++------------------------------- 1 file changed, 3 insertions(+), 31 deletions(-) diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 265cc322..be54ecaa 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -1,11 +1,7 @@ import logging import os -import socket import sys -from distributed import LocalCluster -from distributed.utils import get_ip_interface - from .core import JobQueueCluster logger = logging.getLogger(__name__) @@ -68,8 +64,8 @@ class SGECluster(JobQueueCluster): def __init__(self, queue=None, project=None, - resource_spec='h_vmem=36G', - walltime='0:30:0', + resource_spec=None, + walltime='0:30:00', interface=None, **kwargs): @@ -88,52 +84,28 @@ def __init__(self, # """.lstrip() - # if interface: - # host = get_ip_interface(interface) - # extra += ' --interface %s ' % interface - # else: - # host = socket.gethostname() - - project = project or os.environ.get('SGE_ACCOUNT') - header_lines = ['#!/bin/bash'] - # XXX: change the getattr when this is fixed in master - if getattr(self, 'name', None) is not None: + if self.name is not None: header_lines.append('#$ -N %(name)s') if queue is not None: header_lines.append('#$ -q %(queue)s') if project is not None: header_lines.append('#$ -P %(project)s') if resource_spec is not None: - # TODO clever default like in pbs??? header_lines.append('#$ -l %(resource_spec)s') if walltime is not None: header_lines.append('#$ -l h_rt=%(walltime)s') header_lines.extend(['#$ -cwd', '#$ -j y']) self._header_template = '\n'.join(header_lines) - # if not project: - # raise ValueError("Must specify a project like `project='UCLB1234' " - # "or set SGE_ACCOUNT environment variable") - # self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) - # memory = memory.replace(' ', '') self.config = {'name': getattr(self, 'name', 'default-name'), 'queue': queue, 'project': project, - # 'threads_per_worker': threads_per_worker, 'processes': self.worker_processes, 'walltime': walltime, - # 'scheduler': self.scheduler.address, 'resource_spec': resource_spec, - # 'base_path': dirname, - # 'memory': memory, - # 'death_timeout': death_timeout, - # 'extra': extra } self.job_header = self._header_template % self.config - # self.jobs = dict() - # self.n = 0 - # self._adaptive = None logger.debug("Job script: \n %s" % self.job_script()) From 3013547e6559cf335b70e566a84bf772967f95f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Wed, 11 Apr 2018 17:07:26 +0200 Subject: [PATCH 05/14] Fix flake8 --- dask_jobqueue/sge.py | 16 +--------------- dask_jobqueue/tests/test_sge.py | 1 - 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index be54ecaa..9ee11040 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -71,19 +71,6 @@ def __init__(self, super(SGECluster, self).__init__(**kwargs) -# self._header_template = """ -# #!/bin/bash - -# #$ -N %(name)s -# #$ -q %(queue)s -# #$ -P %(project)s -# #$ -l %(resource_spec)s -# #$ -l h_rt=%(walltime)s -# #$ -cwd -# #$ -j y - -# """.lstrip() - header_lines = ['#!/bin/bash'] if self.name is not None: @@ -104,8 +91,7 @@ def __init__(self, 'project': project, 'processes': self.worker_processes, 'walltime': walltime, - 'resource_spec': resource_spec, - } + 'resource_spec': resource_spec,} self.job_header = self._header_template % self.config logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index da5c67d0..698b50ab 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -1,4 +1,3 @@ -import os from time import time, sleep import pytest From cea22cc3360eb240411d8da8df3168ccbd5a33f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Wed, 11 Apr 2018 17:10:18 +0200 Subject: [PATCH 06/14] Forgotten another self.name fixed in master --- dask_jobqueue/sge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 9ee11040..12e45a1a 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -86,7 +86,7 @@ def __init__(self, header_lines.extend(['#$ -cwd', '#$ -j y']) self._header_template = '\n'.join(header_lines) - self.config = {'name': getattr(self, 'name', 'default-name'), + self.config = {'name': self.name, 'queue': queue, 'project': project, 'processes': self.worker_processes, From 10f7c7264321c02da0b623896c278c0f26989c26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Wed, 11 Apr 2018 18:00:23 +0200 Subject: [PATCH 07/14] Retriggering CIs From 271d78c683d3f3b636d883850ef4f11be9b07d49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 12 Apr 2018 10:39:57 +0200 Subject: [PATCH 08/14] Clean-up * Remove interface parameters * Update docstring which was out of date --- dask_jobqueue/sge.py | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 12e45a1a..cda8466e 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -14,20 +14,11 @@ class SGECluster(JobQueueCluster): Parameters ---------- - name : str - Name of worker jobs. Passed to `$SGE -N` option. queue : str Destination queue for each worker job. Passed to `#$ -q` option. project : str Accounting string associated with each worker job. Passed to `#$ -A` option. - threads_per_worker : int - Number of threads per process. - processes : int - Number of processes per node. - memory : str - Bytes of memory that the worker can use. This should be a string - like "7GB" that can be interpretted both by SGE and Dask. resource_spec : str Request resources and specify job placement. Passed to `#$ -l` option. @@ -35,17 +26,14 @@ class SGECluster(JobQueueCluster): Walltime for each worker job. interface : str Network interface like 'eth0' or 'ib0'. - death_timeout : float - Seconds to wait for a scheduler before closing workers - extra : str - Additional arguments to pass to `dask-worker` kwargs : dict Additional keyword arguments to pass to `LocalCluster` + %(JobQueueCluster.parameters)s Examples -------- >>> from dask_jobqueue import SGECluster - >>> cluster = SGECluster(project='...') + >>> cluster = SGECluster(queue='regular') >>> cluster.start_workers(10) # this may take a few seconds to launch >>> from dask.distributed import Client @@ -66,7 +54,6 @@ def __init__(self, project=None, resource_spec=None, walltime='0:30:00', - interface=None, **kwargs): super(SGECluster, self).__init__(**kwargs) From f4cf1ea04f8687d9f73189d18011b70b66d67942 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 12 Apr 2018 10:52:57 +0200 Subject: [PATCH 09/14] Cosmetic changes to harmonize with pbs.py --- dask_jobqueue/sge.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index cda8466e..3d085546 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -71,14 +71,14 @@ def __init__(self, if walltime is not None: header_lines.append('#$ -l h_rt=%(walltime)s') header_lines.extend(['#$ -cwd', '#$ -j y']) - self._header_template = '\n'.join(header_lines) - - self.config = {'name': self.name, - 'queue': queue, - 'project': project, - 'processes': self.worker_processes, - 'walltime': walltime, - 'resource_spec': resource_spec,} - self.job_header = self._header_template % self.config + header_template = '\n'.join(header_lines) + + config = {'name': self.name, + 'queue': queue, + 'project': project, + 'processes': self.worker_processes, + 'walltime': walltime, + 'resource_spec': resource_spec,} + self.job_header = header_template % config logger.debug("Job script: \n %s" % self.job_script()) From 222f1570f2311cb213a0fc6ccd65c57ca07e0287 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 12 Apr 2018 11:35:35 +0200 Subject: [PATCH 10/14] docrep-related fixes --- dask_jobqueue/sge.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 3d085546..4ba892c1 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -2,13 +2,14 @@ import os import sys -from .core import JobQueueCluster +from .core import JobQueueCluster, docstrings logger = logging.getLogger(__name__) dirname = os.path.dirname(sys.executable) +@docstrings.with_indent(4) class SGECluster(JobQueueCluster): """ Launch Dask on a SGE cluster @@ -26,8 +27,6 @@ class SGECluster(JobQueueCluster): Walltime for each worker job. interface : str Network interface like 'eth0' or 'ib0'. - kwargs : dict - Additional keyword arguments to pass to `LocalCluster` %(JobQueueCluster.parameters)s Examples From e9d0cb836089148e4fd2bd7d2dee2f1d68b32fcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 12 Apr 2018 11:38:24 +0200 Subject: [PATCH 11/14] Make SGECluster importable from dask_jobqueue --- dask_jobqueue/__init__.py | 1 + dask_jobqueue/tests/test_sge.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_jobqueue/__init__.py b/dask_jobqueue/__init__.py index dcee8ff8..55aa52bf 100644 --- a/dask_jobqueue/__init__.py +++ b/dask_jobqueue/__init__.py @@ -2,3 +2,4 @@ from .core import JobQueueCluster from .pbs import PBSCluster from .slurm import SLURMCluster +from .sge import SGECluster diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 698b50ab..ec40a8a1 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -5,7 +5,7 @@ from dask.distributed import Client from distributed.utils_test import loop # noqa: F401 -from dask_jobqueue.sge import SGECluster +from dask_jobqueue import SGECluster pytestmark = pytest.mark.env("sge") From bcf492ba3efda992c3673b3ad7907a3212feab8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 12 Apr 2018 13:15:34 +0200 Subject: [PATCH 12/14] Remove interface parameter from docstring --- dask_jobqueue/sge.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 4ba892c1..1fd92d62 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -25,8 +25,6 @@ class SGECluster(JobQueueCluster): option. walltime : str Walltime for each worker job. - interface : str - Network interface like 'eth0' or 'ib0'. %(JobQueueCluster.parameters)s Examples From a4cd9d308addf7104d1b47efea7f8bae87d13bf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 12 Apr 2018 13:17:27 +0200 Subject: [PATCH 13/14] Group stdlib imports together --- dask_jobqueue/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 08cdca00..f8ad52d1 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -4,9 +4,10 @@ import socket import os import sys -import docrep import shlex +import docrep + from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes from distributed import LocalCluster from distributed.deploy import Cluster From 27067763e895a1719594b57dd3838ae44a6276e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 12 Apr 2018 13:20:21 +0200 Subject: [PATCH 14/14] Remove unused 'dirname' variable --- dask_jobqueue/sge.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 1fd92d62..1dadeb83 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -1,13 +1,9 @@ import logging -import os -import sys from .core import JobQueueCluster, docstrings logger = logging.getLogger(__name__) -dirname = os.path.dirname(sys.executable) - @docstrings.with_indent(4) class SGECluster(JobQueueCluster):