Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 41 additions & 51 deletions Pilot/pilotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import signal
import subprocess
import select
import fcntl
from distutils.version import LooseVersion

############################
Expand Down Expand Up @@ -388,59 +389,48 @@ def executeAndGetOutput(self, cmd, environDict=None):
"""Execute a command on the worker node and get the output"""

self.log.info("Executing command %s" % cmd)
try:
# spawn new processes, connect to their input/output/error pipes, and obtain their return codes.
import subprocess

_p = subprocess.Popen(
"%s" % cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False
)
_p = subprocess.Popen(
"%s" % cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False
)

# simple filter to strip out non-ascii characters
def ascii_filter(in_chr):
if ord(in_chr) < 128:
return in_chr
# Use non-blocking I/O on the process pipes
for fd in [_p.stdout.fileno(), _p.stderr.fileno()]:
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

outData = ""
while True:
readfd, _, _ = select.select([_p.stdout, _p.stderr], [], [])
dataWasRead = False
for stream in readfd:
outChunk = stream.read().decode("ascii", "replace")
if not outChunk:
continue
dataWasRead = True
# Strip unicode replacement characters
outChunk = str(outChunk.replace(u"\ufffd", ""))
if stream == _p.stderr:
sys.stderr.write(outChunk)
sys.stderr.flush()
else:
return ''

outData = ""
isRunning = True
while isRunning:
readfd, _, _ = select.select([_p.stdout, _p.stderr], [], [])
if not readfd:
# not sure if this error is possible
break
for stream in readfd:
# ignore codepoint splitting problems; not worth it
outChunk = stream.read(1024).decode("ascii", "replace")
outChunk = ''.join(filter(ascii_filter, outChunk))
if not outChunk:
# file has reached EOF, program finished
isRunning = False
# Finish processing FDs in case there is still some
# remaining data on other file handle...
continue
if stream == _p.stderr:
sys.stderr.write(outChunk)
sys.stderr.flush()
else:
sys.stdout.write(outChunk)
sys.stdout.flush()
outData += outChunk

# Ensure output ends on a newline
sys.stdout.write("\n")
sys.stdout.flush()
sys.stderr.write("\n")
sys.stderr.flush()

# return code
returnCode = _p.wait()
self.log.debug("Return code of %s: %d" % (cmd, returnCode))

return (returnCode, outData)
except ImportError:
self.log.error("Error importing subprocess")
sys.stdout.write(outChunk)
sys.stdout.flush()
outData += outChunk
# If no data was read on any of the pipes then the process has finished
if not dataWasRead:
break

# Ensure output ends on a newline
sys.stdout.write("\n")
sys.stdout.flush()
sys.stderr.write("\n")
sys.stderr.flush()

# return code
returnCode = _p.wait()
self.log.debug("Return code of %s: %d" % (cmd, returnCode))

return (returnCode, outData)

def exitWithError(self, errorCode):
"""Wrapper around sys.exit()"""
Expand Down