From cd7b09202dddef2c295d644a2bbacb9e3ba42f9c Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Tue, 22 Nov 2022 14:19:41 +0100 Subject: [PATCH 1/3] fix: Ensure all output is caputured by executeAndGetOutput --- Pilot/pilotTools.py | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 900b0bda..e7d7189d 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -18,6 +18,7 @@ import signal import subprocess import select +import fcntl from distutils.version import LooseVersion ############################ @@ -396,30 +397,22 @@ def executeAndGetOutput(self, cmd, environDict=None): "%s" % cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False ) - # simple filter to strip out non-ascii characters - def ascii_filter(in_chr): - if ord(in_chr) < 128: - return in_chr - else: - return '' + # Use non-blocking I/O on the process pipes + for fd in [_p.stdout.fileno(), _p.stderr.fileno()]: + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) outData = "" - isRunning = True - while isRunning: + while True: readfd, _, _ = select.select([_p.stdout, _p.stderr], [], []) - if not readfd: - # not sure if this error is possible - break + dataWasRead = False for stream in readfd: - # ignore codepoint splitting problems; not worth it - outChunk = stream.read(1024).decode("ascii", "replace") - outChunk = ''.join(filter(ascii_filter, outChunk)) + outChunk = stream.read().decode("ascii", "replace") if not outChunk: - # file has reached EOF, program finished - isRunning = False - # Finish processing FDs in case there is still some - # remaining data on other file handle... continue + dataWasRead = True + # Strip unicode replacement characters + outChunk = outChunk.replace("\ufffd", "") if stream == _p.stderr: sys.stderr.write(outChunk) sys.stderr.flush() @@ -427,6 +420,9 @@ def ascii_filter(in_chr): sys.stdout.write(outChunk) sys.stdout.flush() outData += outChunk + # If no data was read on any of the pipes then the process has finished + if not dataWasRead: + break # Ensure output ends on a newline sys.stdout.write("\n") From ad5377378252286943f6c0a43fe78f72bca5dd38 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Tue, 22 Nov 2022 14:26:04 +0100 Subject: [PATCH 2/3] refactor: Remove pointless exception handling in executeAndGetOutput --- Pilot/pilotTools.py | 90 +++++++++++++++++++++------------------------ 1 file changed, 42 insertions(+), 48 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index e7d7189d..153a0114 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -389,54 +389,48 @@ def executeAndGetOutput(self, cmd, environDict=None): """Execute a command on the worker node and get the output""" self.log.info("Executing command %s" % cmd) - try: - # spawn new processes, connect to their input/output/error pipes, and obtain their return codes. - import subprocess - - _p = subprocess.Popen( - "%s" % cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False - ) - - # Use non-blocking I/O on the process pipes - for fd in [_p.stdout.fileno(), _p.stderr.fileno()]: - fl = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - - outData = "" - while True: - readfd, _, _ = select.select([_p.stdout, _p.stderr], [], []) - dataWasRead = False - for stream in readfd: - outChunk = stream.read().decode("ascii", "replace") - if not outChunk: - continue - dataWasRead = True - # Strip unicode replacement characters - outChunk = outChunk.replace("\ufffd", "") - if stream == _p.stderr: - sys.stderr.write(outChunk) - sys.stderr.flush() - else: - sys.stdout.write(outChunk) - sys.stdout.flush() - outData += outChunk - # If no data was read on any of the pipes then the process has finished - if not dataWasRead: - break - - # Ensure output ends on a newline - sys.stdout.write("\n") - sys.stdout.flush() - sys.stderr.write("\n") - sys.stderr.flush() - - # return code - returnCode = _p.wait() - self.log.debug("Return code of %s: %d" % (cmd, returnCode)) - - return (returnCode, outData) - except ImportError: - self.log.error("Error importing subprocess") + _p = subprocess.Popen( + "%s" % cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False + ) + + # Use non-blocking I/O on the process pipes + for fd in [_p.stdout.fileno(), _p.stderr.fileno()]: + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + outData = "" + while True: + readfd, _, _ = select.select([_p.stdout, _p.stderr], [], []) + dataWasRead = False + for stream in readfd: + outChunk = stream.read().decode("ascii", "replace") + if not outChunk: + continue + dataWasRead = True + # Strip unicode replacement characters + outChunk = outChunk.replace("\ufffd", "") + if stream == _p.stderr: + sys.stderr.write(outChunk) + sys.stderr.flush() + else: + sys.stdout.write(outChunk) + sys.stdout.flush() + outData += outChunk + # If no data was read on any of the pipes then the process has finished + if not dataWasRead: + break + + # Ensure output ends on a newline + sys.stdout.write("\n") + sys.stdout.flush() + sys.stderr.write("\n") + sys.stderr.flush() + + # return code + returnCode = _p.wait() + self.log.debug("Return code of %s: %d" % (cmd, returnCode)) + + return (returnCode, outData) def exitWithError(self, errorCode): """Wrapper around sys.exit()""" From 134e7ef728340db95b8054e0d7945fbc69273b43 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Tue, 22 Nov 2022 15:00:02 +0100 Subject: [PATCH 3/3] fix: Fix replacing unicode replacement characters in Python 2 --- Pilot/pilotTools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 153a0114..db041e0e 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -408,7 +408,7 @@ def executeAndGetOutput(self, cmd, environDict=None): continue dataWasRead = True # Strip unicode replacement characters - outChunk = outChunk.replace("\ufffd", "") + outChunk = str(outChunk.replace(u"\ufffd", "")) if stream == _p.stderr: sys.stderr.write(outChunk) sys.stderr.flush()