From 84931c606f29a05c8c56913ef0f546438bf3ebc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Wed, 29 Aug 2018 19:30:28 +0200 Subject: [PATCH 1/4] Raise when submit command exits with a non-zero exit code --- dask_jobqueue/core.py | 53 ++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index c5625327..5180fe2d 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -308,43 +308,44 @@ def scheduler(self): """ The scheduler of this cluster """ return self.local_cluster.scheduler - def _calls(self, cmds, **kwargs): - """ Call a command using subprocess.communicate + def _call(self, cmd): + """ Call a command using subprocess.Popen. - This centralizes calls out to the command line, providing consistent outputs, logging, and an opportunity - to go asynchronous in the future + This centralizes calls out to the command line, providing consistent + outputs, logging, and an opportunity to go asynchronous in the future. Parameters ---------- - cmd: List(List(str)) - A list of commands, each of which is a list of strings to hand to subprocess.communicate + cmd: List(str)) + A command, each of which is a list of strings to hand to + subprocess.communicate Examples -------- - >>> self._calls([['ls'], ['ls', '/foo']]) + >>> self._call(['ls', '/foo']) Returns ------- - The stdout result as a string - Also logs any stderr information + The stdout produced by the command + + Raises + ------ + RuntimeError if the command exits with a non-zero exit code """ - logger.debug("Submitting the following calls to command line") - procs = [] - for cmd in cmds: - logger.debug(' '.join(cmd)) - procs.append(subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)) - - result = [] - for proc in procs: - out, err = proc.communicate() - if err: - logger.error(err.decode()) - result.append(out) - return result - - def _call(self, cmd, **kwargs): - """ Singular version of _calls """ - return self._calls([cmd], **kwargs)[0] + cmd_str = ' '.join(cmd) + logger.debug("Executing the following command to command line\n{}".format(cmd_str)) + + proc = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + out, err = proc.communicate() + if proc.returncode != 0: + raise RuntimeError('Command exited with non-zero exit code.\n' + 'Command:\n{}' + 'stdout:\n{}' + 'stderr:\n{}'.format(cmd_str, out, err)) + return out def stop_workers(self, workers): """ Stop a list of workers""" From 824a45872a5f0d941144fca56c34b6442d9c6816 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 30 Aug 2018 00:28:55 +0200 Subject: [PATCH 2/4] Decode stdout and stderr for better error messages. Add tests. --- dask_jobqueue/core.py | 16 +++++++++----- dask_jobqueue/tests/test_jobqueue_core.py | 27 ++++++++++++++++++++++- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 5180fe2d..7265f9a5 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -10,6 +10,8 @@ from collections import OrderedDict from contextlib import contextmanager +import six + import dask import docrep from distributed import LocalCluster @@ -299,7 +301,7 @@ def start_workers(self, n=1): for _ in range(num_jobs): with self.job_file() as fn: out = self._submit_job(fn) - job = self._job_id_from_submit_output(out.decode()) + job = self._job_id_from_submit_output(out) logger.debug("started job: %s" % job) self.pending_jobs[job] = {} @@ -326,7 +328,7 @@ def _call(self, cmd): Returns ------- - The stdout produced by the command + The stdout produced by the command, as string. Raises ------ @@ -340,11 +342,15 @@ def _call(self, cmd): stderr=subprocess.PIPE) out, err = proc.communicate() + if six.PY3: + out, err = out.decode(), err.decode() if proc.returncode != 0: raise RuntimeError('Command exited with non-zero exit code.\n' - 'Command:\n{}' - 'stdout:\n{}' - 'stderr:\n{}'.format(cmd_str, out, err)) + 'Exit code: {}\n' + 'Command:\n{}\n' + 'stdout:\n{}\n' + 'stderr:\n{}\n'.format(proc.returncode, + cmd_str, out, err)) return out def stop_workers(self, workers): diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index bc0c9260..a13fe716 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -1,7 +1,10 @@ from __future__ import absolute_import, division, print_function -import pytest import socket +import sys +import re + +import pytest from dask_jobqueue import (JobQueueCluster, PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster) @@ -79,3 +82,25 @@ def test_job_id_error_handling(Cluster): return_string = 'Job <12345> submited to .' cluster.job_id_regexp = r'(\d+)' cluster._job_id_from_submit_output(return_string) + + +def test_jobqueue_cluster_call(tmpdir): + cluster = PBSCluster(cores=1, memory='1GB') + + path = tmpdir.join('test.py') + path.write('print("this is the stdout")') + + out = cluster._call([sys.executable, path.strpath]) + assert out == 'this is the stdout\n' + + path_with_error = tmpdir.join('non-zero-exit-code.py') + path_with_error.write('print("this is the stdout")\n1/0') + + match = ('Command exited with non-zero exit code.+' + 'Exit code: 1.+' + 'stdout:\nthis is the stdout.+' + 'stderr:.+ZeroDivisionError') + + match = re.compile(match, re.DOTALL) + with pytest.raises(RuntimeError, match=match): + cluster._call([sys.executable, path_with_error.strpath]) From cffe675386c9e5ed64248cac27894d89bf556518 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 16 Oct 2018 08:41:31 +0000 Subject: [PATCH 3/4] identified fixes --- dask_jobqueue/core.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 7265f9a5..a09b180a 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -310,7 +310,7 @@ def scheduler(self): """ The scheduler of this cluster """ return self.local_cluster.scheduler - def _call(self, cmd): + def _call(self, cmd, **kwargs): """ Call a command using subprocess.Popen. This centralizes calls out to the command line, providing consistent @@ -320,7 +320,7 @@ def _call(self, cmd): ---------- cmd: List(str)) A command, each of which is a list of strings to hand to - subprocess.communicate + subprocess.Popen Examples -------- @@ -339,7 +339,8 @@ def _call(self, cmd): proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, + **kwargs) out, err = proc.communicate() if six.PY3: From c9c66c56444a9f7cbfb86a4abed322f1f498e582 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Tue, 16 Oct 2018 14:28:02 +0200 Subject: [PATCH 4/4] Fix merge gone wrong --- dask_jobqueue/tests/test_jobqueue_core.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 11c2794a..9ad4ec39 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -1,5 +1,7 @@ from __future__ import absolute_import, division, print_function +import os +import shutil import socket import sys import re