From 3fb70b0cae72f10f624f8fef9e580a3cd7567cc4 Mon Sep 17 00:00:00 2001 From: ricardo Date: Mon, 23 Apr 2012 12:55:23 +0200 Subject: [PATCH 1/3] FIX: get ProcessingType from JDL if defined. --- .../Agent/StalledJobAgent.py | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/WorkloadManagementSystem/Agent/StalledJobAgent.py b/WorkloadManagementSystem/Agent/StalledJobAgent.py index 19b21014676..c7e9c03e234 100755 --- a/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -16,6 +16,7 @@ from DIRAC import S_OK, S_ERROR from DIRAC.Core.DISET.RPCClient import RPCClient from DIRAC.AccountingSystem.Client.Types.Job import Job +from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd import types class StalledJobAgent( AgentModule ): @@ -28,6 +29,9 @@ class StalledJobAgent( AgentModule ): - finalize() - the graceful exit of the method, this one is usually used for the agent restart """ + jobDB = None + logDB = None + enable = True ############################################################################# def initialize( self ): @@ -36,7 +40,7 @@ def initialize( self ): self.jobDB = JobDB() self.logDB = JobLoggingDB() self.am_setOption( 'PollingTime', 60 * 60 ) - self.enable = self.am_getOption( 'Enable', True ) + self.enable = self.am_getOption( 'Enable', self.enable ) if not self.enable: self.log.info( 'Stalled Job Agent running in disabled mode' ) return S_OK() @@ -265,6 +269,19 @@ def __updateJobStatus( self, job, status, minorstatus = None ): return result + def __getProcessingType( self, jobID ): + """ Get the Processing Type from the JDL, until it is promoted to a real Attribute + """ + processingType = 'unknown' + result = self.jobDB.getJobJDL( jobID, original = True ) + if not result['OK']: + return processingType + classAdJob = ClassAd( result['Value'] ) + if classAdJob.lookupAttribute( 'ProcessingType' ): + processingType = classAdJob.getAttributeString( 'ProcessingType' ) + return processingType + + ############################################################################# def sendAccounting( self, jobID ): """Send WMS accounting data for the given job @@ -339,6 +356,8 @@ def sendAccounting( self, jobID ): if heartBeatTime > lastHeartBeatTime: lastHeartBeatTime = heartBeatTime + processingType = self.__getProcessingType( jobID ) + accountingReport.setStartTime( startTime ) accountingReport.setEndTime() # execTime = toEpoch( endTime ) - toEpoch( startTime ) @@ -349,7 +368,7 @@ def sendAccounting( self, jobID ): 'JobGroup' : jobDict['JobGroup'], 'JobType' : jobDict['JobType'], 'JobClass' : jobDict['JobSplitType'], - 'ProcessingType' : 'unknown', + 'ProcessingType' : processingType, 'FinalMajorStatus' : 'Failed', 'FinalMinorStatus' : 'Stalled', 'CPUTime' : lastCPUTime, From b768e2ea31f12cc53a0b7fa99f70fdb5e16d7b6c Mon Sep 17 00:00:00 2001 From: ricardo Date: Mon, 23 Apr 2012 20:17:03 +0200 Subject: [PATCH 2/3] BUGFIX: typo Value -> OK --- Interfaces/scripts/dirac-admin-add-user.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Interfaces/scripts/dirac-admin-add-user.py b/Interfaces/scripts/dirac-admin-add-user.py index 84b4b317812..b093f04ae3d 100644 --- a/Interfaces/scripts/dirac-admin-add-user.py +++ b/Interfaces/scripts/dirac-admin-add-user.py @@ -81,7 +81,7 @@ def addUserGroup( arg ): Script.gLogger.info( "Setting property %s to %s" % ( pName, pValue ) ) userProps[ pName ] = pValue -if not diracAdmin.csModifyUser( userName, userProps, createIfNonExistant = True )['Value']: +if not diracAdmin.csModifyUser( userName, userProps, createIfNonExistant = True )['OK']: errorList.append( ( "add user", "Cannot register user %s" % userName ) ) exitCode = 255 else: From b6474065c6b887d910de48d96cb5a03d0496181a Mon Sep 17 00:00:00 2001 From: ricardo Date: Mon, 23 Apr 2012 21:25:13 +0200 Subject: [PATCH 3/3] FIX: Adding timeout to urllib2.urlopen call, 60 seconds for .cfgs --- Core/scripts/dirac-install.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/Core/scripts/dirac-install.py b/Core/scripts/dirac-install.py index 58c9856d6c8..924394234ae 100755 --- a/Core/scripts/dirac-install.py +++ b/Core/scripts/dirac-install.py @@ -302,7 +302,7 @@ def __loadCFGFromURL( self, urlcfg, checkHash = False ): if urlcfg in self.__cfgCache: return S_OK( self.__cfgCache[ urlcfg ] ) try: - cfgFile = urllib2.urlopen( urlcfg ) + cfgFile = urllib2.urlopen( urlcfg, timeout = 60 ) except: return S_ERROR( "Could not open %s" % urlcfg ) try: @@ -315,7 +315,7 @@ def __loadCFGFromURL( self, urlcfg, checkHash = False ): self.__cfgCache[ urlcfg ] = cfg return S_OK( cfg ) try: - md5File = urllib2.urlopen( urlcfg[:-4] + ".md5" ) + md5File = urllib2.urlopen( urlcfg[:-4] + ".md5", timeout = 60 ) md5Hex = md5File.read().strip() md5File.close() if md5Hex != md5.md5( cfgData ).hexdigest(): @@ -770,7 +770,7 @@ def urlretrieveTimeout( url, fileName, timeout = 0 ): if timeout: signal.signal( signal.SIGALRM, alarmTimeoutHandler ) # set timeout alarm - signal.alarm( timeout ) + signal.alarm( timeout + 5 ) try: # if "http_proxy" in os.environ and os.environ['http_proxy']: # proxyIP = os.environ['http_proxy'] @@ -778,7 +778,7 @@ def urlretrieveTimeout( url, fileName, timeout = 0 ): # opener = urllib2.build_opener( proxy ) # #opener = urllib2.build_opener() # urllib2.install_opener( opener ) - remoteFD = urllib2.urlopen( url ) + remoteFD = urllib2.urlopen( url, timeout = timeout ) expectedBytes = long( remoteFD.info()[ 'Content-Length' ] ) localFD = open( fileName, "wb" ) receivedBytes = 0L @@ -805,6 +805,8 @@ def urlretrieveTimeout( url, fileName, timeout = 0 ): if x.code == 404: logERROR( "%s does not exist" % url ) return False + except urllib2.URLError: + logError( 'Timeout after %s seconds on transfer request for "%s"' % ( str( timeout ), url ) ) except Exception, x: if x == 'Timeout': logERROR( 'Timeout after %s seconds on transfer request for "%s"' % ( str( timeout ), url ) ) @@ -1219,7 +1221,7 @@ def createPermanentDirLinks(): except Exception, x: logERROR( str( x ) ) return False - + return True def createBashrc(): @@ -1276,7 +1278,7 @@ def createBashrc(): def createCshrc(): """ Create DIRAC environment setting script for the (t)csh shell """ - + proPath = cliParams.targetPath # Now create cshrc at basePath try: @@ -1379,7 +1381,7 @@ def writeDefaultConfiguration(): if not createBashrc(): sys.exit( 1 ) if not createCshrc(): - sys.exit( 1 ) + sys.exit( 1 ) runExternalsPostInstall() writeDefaultConfiguration() installExternalRequirements( cliParams.externalsType )