diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 37d80c06..d1f6ca27 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -18,6 +18,7 @@ import signal import subprocess import ssl +import fcntl from datetime import datetime from functools import partial from distutils.version import LooseVersion @@ -526,65 +527,48 @@ def executeAndGetOutput(self, cmd, environDict=None): """Execute a command on the worker node and get the output""" self.log.info("Executing command %s" % cmd) - try: - # spawn new processes, connect to their input/output/error pipes, and obtain their return codes. - import subprocess - - _p = subprocess.Popen( - "%s" % cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False - ) + _p = subprocess.Popen( + "%s" % cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False + ) - # simple filter to strip out non-ascii characters - def ascii_filter(in_chr): - if ord(in_chr) < 128: - return in_chr + # Use non-blocking I/O on the process pipes + for fd in [_p.stdout.fileno(), _p.stderr.fileno()]: + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + outData = "" + while True: + readfd, _, _ = select.select([_p.stdout, _p.stderr], [], []) + dataWasRead = False + for stream in readfd: + outChunk = stream.read().decode("ascii", "replace") + if not outChunk: + continue + dataWasRead = True + # Strip unicode replacement characters + outChunk = str(outChunk.replace(u"\ufffd", "")) + if stream == _p.stderr: + sys.stderr.write(outChunk) + sys.stderr.flush() else: - return "" - - outData = "" - isRunning = True - while isRunning: - readfd, _, _ = select.select([_p.stdout, _p.stderr], [], []) - if not readfd: - # not sure if this error is possible - break - for stream in readfd: - # ignore codepoint splitting problems; not worth it - outChunk = stream.read(1024).decode("ascii", "replace") - outChunk = "".join(filter(ascii_filter, outChunk)) - if not outChunk: - # file has reached EOF, program finished - isRunning = False - # Finish processing FDs in case there is still some - # remaining data on other file handle... - continue - if stream == _p.stderr: - sys.stderr.write(outChunk) - sys.stderr.flush() - else: - sys.stdout.write(outChunk) - sys.stdout.flush() - # add outChunk to an existing buffer of the remote logger, if enabled. - if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn: - self.log.buffer.write(outChunk) - outData += outChunk - - # Ensure output ends on a newline - sys.stdout.write("\n") - sys.stdout.flush() - if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn: - if not self.log.buffer.getValue().endswith("\n"): - self.log.buffer.write("\n") - sys.stderr.write("\n") - sys.stderr.flush() - - # return code - returnCode = _p.wait() - self.log.debug("Return code of %s: %d" % (cmd, returnCode)) - - return (returnCode, outData) - except ImportError: - self.log.error("Error importing subprocess") + sys.stdout.write(outChunk) + sys.stdout.flush() + outData += outChunk + # If no data was read on any of the pipes then the process has finished + if not dataWasRead: + break + + # Ensure output ends on a newline + sys.stdout.write("\n") + sys.stdout.flush() + sys.stderr.write("\n") + sys.stderr.flush() + + # return code + returnCode = _p.wait() + self.log.debug("Return code of %s: %d" % (cmd, returnCode)) + + return (returnCode, outData) def exitWithError(self, errorCode): """Wrapper around sys.exit()"""