From 270c4ba18c1ef974843efe1c3b6822edcf71ef44 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Tue, 22 Nov 2022 14:19:41 +0100 Subject: [PATCH 1/3] fix: Ensure all output is caputured by executeAndGetOutput --- Pilot/pilotTools.py | 38 ++++++++++++++------------------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 37d80c06..c8091bd8 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -18,6 +18,7 @@ import signal import subprocess import ssl +import fcntl from datetime import datetime from functools import partial from distutils.version import LooseVersion @@ -534,47 +535,36 @@ def executeAndGetOutput(self, cmd, environDict=None): "%s" % cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False ) - # simple filter to strip out non-ascii characters - def ascii_filter(in_chr): - if ord(in_chr) < 128: - return in_chr - else: - return "" + # Use non-blocking I/O on the process pipes + for fd in [_p.stdout.fileno(), _p.stderr.fileno()]: + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) outData = "" - isRunning = True - while isRunning: + while True: readfd, _, _ = select.select([_p.stdout, _p.stderr], [], []) - if not readfd: - # not sure if this error is possible - break + dataWasRead = False for stream in readfd: - # ignore codepoint splitting problems; not worth it - outChunk = stream.read(1024).decode("ascii", "replace") - outChunk = "".join(filter(ascii_filter, outChunk)) + outChunk = stream.read().decode("ascii", "replace") if not outChunk: - # file has reached EOF, program finished - isRunning = False - # Finish processing FDs in case there is still some - # remaining data on other file handle... continue + dataWasRead = True + # Strip unicode replacement characters + outChunk = outChunk.replace("\ufffd", "") if stream == _p.stderr: sys.stderr.write(outChunk) sys.stderr.flush() else: sys.stdout.write(outChunk) sys.stdout.flush() - # add outChunk to an existing buffer of the remote logger, if enabled. - if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn: - self.log.buffer.write(outChunk) outData += outChunk + # If no data was read on any of the pipes then the process has finished + if not dataWasRead: + break # Ensure output ends on a newline sys.stdout.write("\n") sys.stdout.flush() - if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn: - if not self.log.buffer.getValue().endswith("\n"): - self.log.buffer.write("\n") sys.stderr.write("\n") sys.stderr.flush() From b3a50cf4658768caf0be90417a7bb676752b9d6a Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Tue, 22 Nov 2022 14:26:04 +0100 Subject: [PATCH 2/3] refactor: Remove pointless exception handling in executeAndGetOutput --- Pilot/pilotTools.py | 86 +++++++++++++++++++++------------------------ 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index c8091bd8..9e5aa0c1 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -527,54 +527,48 @@ def executeAndGetOutput(self, cmd, environDict=None): """Execute a command on the worker node and get the output""" self.log.info("Executing command %s" % cmd) - try: - # spawn new processes, connect to their input/output/error pipes, and obtain their return codes. - import subprocess + _p = subprocess.Popen( + "%s" % cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False + ) - _p = subprocess.Popen( - "%s" % cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False - ) + # Use non-blocking I/O on the process pipes + for fd in [_p.stdout.fileno(), _p.stderr.fileno()]: + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + outData = "" + while True: + readfd, _, _ = select.select([_p.stdout, _p.stderr], [], []) + dataWasRead = False + for stream in readfd: + outChunk = stream.read().decode("ascii", "replace") + if not outChunk: + continue + dataWasRead = True + # Strip unicode replacement characters + outChunk = outChunk.replace("\ufffd", "") + if stream == _p.stderr: + sys.stderr.write(outChunk) + sys.stderr.flush() + else: + sys.stdout.write(outChunk) + sys.stdout.flush() + outData += outChunk + # If no data was read on any of the pipes then the process has finished + if not dataWasRead: + break + + # Ensure output ends on a newline + sys.stdout.write("\n") + sys.stdout.flush() + sys.stderr.write("\n") + sys.stderr.flush() - # Use non-blocking I/O on the process pipes - for fd in [_p.stdout.fileno(), _p.stderr.fileno()]: - fl = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - - outData = "" - while True: - readfd, _, _ = select.select([_p.stdout, _p.stderr], [], []) - dataWasRead = False - for stream in readfd: - outChunk = stream.read().decode("ascii", "replace") - if not outChunk: - continue - dataWasRead = True - # Strip unicode replacement characters - outChunk = outChunk.replace("\ufffd", "") - if stream == _p.stderr: - sys.stderr.write(outChunk) - sys.stderr.flush() - else: - sys.stdout.write(outChunk) - sys.stdout.flush() - outData += outChunk - # If no data was read on any of the pipes then the process has finished - if not dataWasRead: - break - - # Ensure output ends on a newline - sys.stdout.write("\n") - sys.stdout.flush() - sys.stderr.write("\n") - sys.stderr.flush() - - # return code - returnCode = _p.wait() - self.log.debug("Return code of %s: %d" % (cmd, returnCode)) - - return (returnCode, outData) - except ImportError: - self.log.error("Error importing subprocess") + # return code + returnCode = _p.wait() + self.log.debug("Return code of %s: %d" % (cmd, returnCode)) + + return (returnCode, outData) def exitWithError(self, errorCode): """Wrapper around sys.exit()""" From b39d4b952ff08c77406495cc62961901ed097a9e Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Tue, 22 Nov 2022 15:00:02 +0100 Subject: [PATCH 3/3] fix: Fix replacing unicode replacement characters in Python 2 --- Pilot/pilotTools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 9e5aa0c1..d1f6ca27 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -546,7 +546,7 @@ def executeAndGetOutput(self, cmd, environDict=None): continue dataWasRead = True # Strip unicode replacement characters - outChunk = outChunk.replace("\ufffd", "") + outChunk = str(outChunk.replace(u"\ufffd", "")) if stream == _p.stderr: sys.stderr.write(outChunk) sys.stderr.flush()