diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index f81a4480..176bb2a3 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -11,6 +11,8 @@ from collections import OrderedDict from contextlib import contextmanager +import six + import dask import docrep from distributed import LocalCluster @@ -326,7 +328,7 @@ def start_workers(self, n=1): for _ in range(num_jobs): with self.job_file() as fn: out = self._submit_job(fn) - job = self._job_id_from_submit_output(out.decode()) + job = self._job_id_from_submit_output(out) if not job: raise ValueError('Unable to parse jobid from output of %s' % out) logger.debug("started job: %s", job) @@ -337,43 +339,49 @@ def scheduler(self): """ The scheduler of this cluster """ return self.local_cluster.scheduler - def _calls(self, cmds, **kwargs): - """ Call a command using subprocess.communicate + def _call(self, cmd, **kwargs): + """ Call a command using subprocess.Popen. - This centralizes calls out to the command line, providing consistent outputs, logging, and an opportunity - to go asynchronous in the future + This centralizes calls out to the command line, providing consistent + outputs, logging, and an opportunity to go asynchronous in the future. Parameters ---------- - cmd: List(List(str)) - A list of commands, each of which is a list of strings to hand to subprocess.communicate + cmd: List(str)) + A command, each of which is a list of strings to hand to + subprocess.Popen Examples -------- - >>> self._calls([['ls'], ['ls', '/foo']]) + >>> self._call(['ls', '/foo']) Returns ------- - The stdout result as a string - Also logs any stderr information - """ - logger.debug("Submitting the following calls to command line") - procs = [] - for cmd in cmds: - logger.debug(' '.join(cmd)) - procs.append(subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)) - - result = [] - for proc in procs: - out, err = proc.communicate() - if err: - raise RuntimeError(err.decode()) - result.append(out) - return result + The stdout produced by the command, as string. - def _call(self, cmd, **kwargs): - """ Singular version of _calls """ - return self._calls([cmd], **kwargs)[0] + Raises + ------ + RuntimeError if the command exits with a non-zero exit code + """ + cmd_str = ' '.join(cmd) + logger.debug("Executing the following command to command line\n{}".format(cmd_str)) + + proc = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs) + + out, err = proc.communicate() + if six.PY3: + out, err = out.decode(), err.decode() + if proc.returncode != 0: + raise RuntimeError('Command exited with non-zero exit code.\n' + 'Exit code: {}\n' + 'Command:\n{}\n' + 'stdout:\n{}\n' + 'stderr:\n{}\n'.format(proc.returncode, + cmd_str, out, err)) + return out def stop_workers(self, workers): """ Stop a list of workers""" diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 888b2bc3..9ad4ec39 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -1,10 +1,12 @@ from __future__ import absolute_import, division, print_function import os -import pytest import shutil import socket import sys +import re + +import pytest from dask_jobqueue import (JobQueueCluster, PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster) @@ -107,3 +109,25 @@ def test_log_directory(tmpdir): with PBSCluster(cores=1, memory='1GB', log_directory=tmpdir.strpath): assert os.path.exists(tmpdir.strpath) + + +def test_jobqueue_cluster_call(tmpdir): + cluster = PBSCluster(cores=1, memory='1GB') + + path = tmpdir.join('test.py') + path.write('print("this is the stdout")') + + out = cluster._call([sys.executable, path.strpath]) + assert out == 'this is the stdout\n' + + path_with_error = tmpdir.join('non-zero-exit-code.py') + path_with_error.write('print("this is the stdout")\n1/0') + + match = ('Command exited with non-zero exit code.+' + 'Exit code: 1.+' + 'stdout:\nthis is the stdout.+' + 'stderr:.+ZeroDivisionError') + + match = re.compile(match, re.DOTALL) + with pytest.raises(RuntimeError, match=match): + cluster._call([sys.executable, path_with_error.strpath])