From 38d8d1e5c3164fd5d8ad49dec3184dbb2a89d6d7 Mon Sep 17 00:00:00 2001 From: aldbr Date: Fri, 19 May 2023 11:13:43 +0200 Subject: [PATCH] fix: SSHCE getPilotOutput --- .../Computing/SSHComputingElement.py | 223 +++++++++--------- 1 file changed, 105 insertions(+), 118 deletions(-) diff --git a/src/DIRAC/Resources/Computing/SSHComputingElement.py b/src/DIRAC/Resources/Computing/SSHComputingElement.py index 0bc1fa00705..99afd4697c3 100644 --- a/src/DIRAC/Resources/Computing/SSHComputingElement.py +++ b/src/DIRAC/Resources/Computing/SSHComputingElement.py @@ -62,11 +62,12 @@ **Code Documentation** """ -import os +import errno import json -import stat +import os +import pexpect import shutil -import errno +import stat from urllib.parse import urlparse from urllib.parse import quote from urllib.parse import unquote @@ -136,46 +137,36 @@ def __ssh_call(self, command, timeout): if not timeout: timeout = 999 + ssh_newkey = "Are you sure you want to continue connecting" try: - import pexpect - - ssh_newkey = "Are you sure you want to continue connecting" - try: - child = pexpect.spawn(command, timeout=timeout, encoding="utf-8") - i = child.expect([pexpect.TIMEOUT, ssh_newkey, pexpect.EOF, "assword: "]) + child = pexpect.spawn(command, timeout=timeout, encoding="utf-8") + i = child.expect([pexpect.TIMEOUT, ssh_newkey, pexpect.EOF, "assword: "]) + if i == 0: # Timeout + return S_OK((-1, child.before, "SSH login failed")) + + if i == 1: # SSH does not have the public key. Just accept it. + child.sendline("yes") + child.expect("assword: ") + i = child.expect([pexpect.TIMEOUT, "assword: "]) if i == 0: # Timeout - return S_OK((-1, child.before, "SSH login failed")) - elif i == 1: # SSH does not have the public key. Just accept it. - child.sendline("yes") - child.expect("assword: ") - i = child.expect([pexpect.TIMEOUT, "assword: "]) - if i == 0: # Timeout - return S_OK((-1, str(child.before) + str(child.after), "SSH login failed")) - elif i == 1: - child.sendline(self.password) - child.expect(pexpect.EOF) - return S_OK((0, child.before, "")) - elif i == 2: - # Passwordless login, get the output - return S_OK((0, child.before, "")) - - if self.password: + return S_OK((-1, str(child.before) + str(child.after), "SSH login failed")) + if i == 1: child.sendline(self.password) child.expect(pexpect.EOF) return S_OK((0, child.before, "")) - return S_ERROR((-2, child.before, "")) - except Exception as x: - res = (-1, f"Encountered exception {Exception}: {str(x)}") - return S_ERROR(res) - except BaseException: - from DIRAC.Core.Utilities.Subprocess import shellCall - - # Try passwordless login - result = shellCall(timeout, command) - # print ( "!!! SSH command: %s returned %s\n" % (command, result) ) - if result["Value"][0] == 255: - return S_ERROR((-1, f"Cannot connect to host {self.host}", "")) - return result + + if i == 2: + # Passwordless login, get the output + return S_OK((0, child.before, "")) + + if self.password: + child.sendline(self.password) + child.expect(pexpect.EOF) + return S_OK((0, child.before, "")) + + return S_ERROR(f"Unknown error: {child.before}") + except Exception as x: + return S_ERROR(f"Encountered exception: {str(x)}") def sshCall(self, timeout, cmdSeq): """Execute remote command via a ssh remote call @@ -423,7 +414,7 @@ def _prepareRemoteHost(self, host=None): self.log.verbose(f"Creating working directories on {self.ceParameters['SSHHost']}") result = ssh.sshCall(30, cmd) if not result["OK"]: - self.log.error("Failed creating working directories", f"({result['Message'][1]})") + self.log.error("Failed creating working directories", f"({result['Message']})") return result status, output, _error = result["Value"] if status == -1: @@ -443,7 +434,7 @@ def _prepareRemoteHost(self, host=None): remoteScript = f"{self.sharedArea}/execute_batch" result = ssh.scpCall(30, localScript, remoteScript, postUploadCommand=f"chmod +x {remoteScript}") if not result["OK"]: - self.log.warn(f"Failed uploading control script: {result['Message'][1]}") + self.log.warn(f"Failed uploading control script: {result['Message']}") return result status, output, _error = result["Value"] if status != 0: @@ -634,13 +625,11 @@ def killJob(self, jobIDList): def _killJobOnHost(self, jobIDList, host=None): """Kill the jobs for the given list of job IDs""" - jobDict = {} - for job in jobIDList: - stamp = os.path.basename(urlparse(job).path) - jobDict[stamp] = job - stampList = list(jobDict) + batchSystemJobList = [] + for jobID in jobIDList: + batchSystemJobList.append(os.path.basename(urlparse(jobID.split(":::")[0]).path)) - commandOptions = {"JobIDList": stampList, "User": self.user} + commandOptions = {"JobIDList": batchSystemJobList, "User": self.user} resultCommand = self.__executeHostCommand("killJob", commandOptions, host=host) if not resultCommand["OK"]: return resultCommand @@ -654,18 +643,6 @@ def _killJobOnHost(self, jobIDList, host=None): return S_OK(len(result["Successful"])) - def _getHostStatus(self, host=None): - """Get jobs running at a given host""" - resultCommand = self.__executeHostCommand("getCEStatus", {}, host=host) - if not resultCommand["OK"]: - return resultCommand - - result = resultCommand["Value"] - if result["Status"] != 0: - return S_ERROR(f"Failed to get CE status: {result['Message']}") - - return S_OK(result) - def getCEStatus(self): """Method to return information on running and pending jobs.""" result = S_OK() @@ -686,21 +663,31 @@ def getCEStatus(self): return result + def _getHostStatus(self, host=None): + """Get jobs running at a given host""" + resultCommand = self.__executeHostCommand("getCEStatus", {}, host=host) + if not resultCommand["OK"]: + return resultCommand + + result = resultCommand["Value"] + if result["Status"] != 0: + return S_ERROR(f"Failed to get CE status: {result['Message']}") + + return S_OK(result) + def getJobStatus(self, jobIDList): """Get the status information for the given list of jobs""" return self._getJobStatusOnHost(jobIDList) def _getJobStatusOnHost(self, jobIDList, host=None): """Get the status information for the given list of jobs""" - resultDict = {} - jobDict = {} - for job in jobIDList: - stamp = os.path.basename(urlparse(job).path) - jobDict[stamp] = job - stampList = list(jobDict) + batchSystemJobDict = {} + for jobID in jobIDList: + batchSystemJobID = os.path.basename(urlparse(jobID.split(":::")[0]).path) + batchSystemJobDict[batchSystemJobID] = jobID - for jobList in breakListIntoChunks(stampList, 100): + for jobList in breakListIntoChunks(list(batchSystemJobDict), 100): resultCommand = self.__executeHostCommand("getJobStatus", {"JobIDList": jobList}, host=host) if not resultCommand["OK"]: return resultCommand @@ -709,14 +696,54 @@ def _getJobStatusOnHost(self, jobIDList, host=None): if result["Status"] != 0: return S_ERROR(f"Failed to get job status: {result['Message']}") - for stamp in result["Jobs"]: - resultDict[jobDict[stamp]] = result["Jobs"][stamp] + for batchSystemJobID in result["Jobs"]: + resultDict[batchSystemJobDict[batchSystemJobID]] = result["Jobs"][batchSystemJobID] return S_OK(resultDict) + def getJobOutput(self, jobID, localDir=None): + """Get the specified job standard output and error files. If the localDir is provided, + the output is returned as file in this directory. Otherwise, the output is returned + as strings. + """ + self.log.verbose("Getting output for jobID", jobID) + result = self._getJobOutputFiles(jobID) + if not result["OK"]: + return result + + batchSystemJobID, host, outputFile, errorFile = result["Value"] + + if localDir: + localOutputFile = f"{localDir}/{batchSystemJobID}.out" + localErrorFile = f"{localDir}/{batchSystemJobID}.err" + else: + localOutputFile = "Memory" + localErrorFile = "Memory" + + # Take into account the SSHBatch possible SSHHost syntax + host = host.split("/")[0] + + ssh = SSH(host=host, parameters=self.ceParameters) + resultStdout = ssh.scpCall(30, localOutputFile, outputFile, upload=False) + if not resultStdout["OK"]: + return resultStdout + + resultStderr = ssh.scpCall(30, localErrorFile, errorFile, upload=False) + if not resultStderr["OK"]: + return resultStderr + + if localDir: + output = localOutputFile + error = localErrorFile + else: + output = resultStdout["Value"][1] + error = resultStderr["Value"][1] + + return S_OK((output, error)) + def _getJobOutputFiles(self, jobID): """Get output file names for the specific CE""" - jobStamp = os.path.basename(urlparse(jobID).path) + batchSystemJobID = os.path.basename(urlparse(jobID.split(":::")[0]).path) # host can be retrieved from the path of the jobID # it might not be present, in this case host is an empty string and will be defined by the CE parameters later host = os.path.dirname(urlparse(jobID).path).lstrip("/") @@ -726,19 +753,19 @@ def _getJobOutputFiles(self, jobID): self.errorTemplate = self.ceParameters["ErrorTemplate"] if self.outputTemplate: - output = self.outputTemplate % jobStamp - error = self.errorTemplate % jobStamp + output = self.outputTemplate % batchSystemJobID + error = self.errorTemplate % batchSystemJobID elif "OutputTemplate" in self.ceParameters: self.outputTemplate = self.ceParameters["OutputTemplate"] self.errorTemplate = self.ceParameters["ErrorTemplate"] - output = self.outputTemplate % jobStamp - error = self.errorTemplate % jobStamp + output = self.outputTemplate % batchSystemJobID + error = self.errorTemplate % batchSystemJobID elif hasattr(self.batchSystem, "getJobOutputFiles"): # numberOfNodes is treated as a string as it can contain values such as "2-4" # where 2 would represent the minimum number of nodes to allocate, and 4 the maximum numberOfNodes = self.ceParameters.get("NumberOfNodes", "1") commandOptions = { - "JobIDList": [jobStamp], + "JobIDList": [batchSystemJobID], "OutputDir": self.batchOutput, "ErrorDir": self.batchError, "NumberOfNodes": numberOfNodes, @@ -755,50 +782,10 @@ def _getJobOutputFiles(self, jobID): self.outputTemplate = result["OutputTemplate"] self.errorTemplate = result["ErrorTemplate"] - output = result["Jobs"][jobStamp]["Output"] - error = result["Jobs"][jobStamp]["Error"] - else: - output = f"{self.batchOutput}/{jobStamp}.out" - error = f"{self.batchError}/{jobStamp}.err" - - return S_OK((jobStamp, host, output, error)) - - def getJobOutput(self, jobID, localDir=None): - """Get the specified job standard output and error files. If the localDir is provided, - the output is returned as file in this directory. Otherwise, the output is returned - as strings. - """ - self.log.verbose("Getting output for jobID", jobID) - result = self._getJobOutputFiles(jobID) - if not result["OK"]: - return result - - jobStamp, host, outputFile, errorFile = result["Value"] - - if localDir: - localOutputFile = f"{localDir}/{jobStamp}.out" - localErrorFile = f"{localDir}/{jobStamp}.err" - else: - localOutputFile = "Memory" - localErrorFile = "Memory" - - # Take into account the SSHBatch possible SSHHost syntax - host = host.split("/")[0] - - ssh = SSH(host=host, parameters=self.ceParameters) - resultStdout = ssh.scpCall(30, localOutputFile, outputFile, upload=False) - if not resultStdout["OK"]: - return resultStdout - - resultStderr = ssh.scpCall(30, localErrorFile, errorFile, upload=False) - if not resultStderr["OK"]: - return resultStderr - - if localDir: - output = localOutputFile - error = localErrorFile + output = result["Jobs"][batchSystemJobID]["Output"] + error = result["Jobs"][batchSystemJobID]["Error"] else: - output = resultStdout["Value"][1] - error = resultStderr["Value"][1] + output = f"{self.batchOutput}/{batchSystemJobID}.out" + error = f"{self.batchError}/{batchSystemJobID}.err" - return S_OK((output, error)) + return S_OK((batchSystemJobID, host, output, error))