diff --git a/Core/Utilities/ProcessPool.py b/Core/Utilities/ProcessPool.py index 9e1edbe218e..1dddd743d85 100644 --- a/Core/Utilities/ProcessPool.py +++ b/Core/Utilities/ProcessPool.py @@ -145,11 +145,6 @@ def __init__( self, pendingQueue, resultsQueue ): """ multiprocessing.Process.__init__( self ) self.daemon = True - if LockRing: - #Reset all locks - lr = LockRing() - lr._openAll() - lr._setAllEvents() self.__working = multiprocessing.Value( 'i', 0 ) self.__pendingQueue = pendingQueue self.__resultsQueue = resultsQueue @@ -170,12 +165,12 @@ def run( self ): :param self: self reference """ + if LockRing: + # Reset all locks + lr = LockRing() + lr._openAll() + lr._setAllEvents() while True: - if LockRing: - # Reset all locks - lr = LockRing() - lr._openAll() - lr._setAllEvents() try: task = self.__pendingQueue.get( block = True, timeout = 10 ) except Queue.Empty: @@ -185,10 +180,10 @@ def run( self ): self.__working.value = 1 try: task.process() + if task.hasCallback() or task.usePoolCallbacks(): + self.__resultsQueue.put( task, block = True ) finally: self.__working.value = 0 - if task.hasCallback() or task.usePoolCallbacks(): - self.__resultsQueue.put( task, block = True ) class BulletTask: """ dum-dum bullet """ @@ -250,6 +245,7 @@ def __call__( self ): self.__taskID = taskID self.__resultCallback = callback self.__exceptionCallback = exceptionCallback + self.__timeOut = 0 ## set time out self.setTimeOut( timeOut ) self.__done = False @@ -362,7 +358,6 @@ def doCallback( self ): if self.__done and not self.__exceptionRaised and self.__resultCallback: self.__resultCallback( self, self.__taskResult ) - def process( self ): """ execute task @@ -437,7 +432,9 @@ def __init__( self, minSize = 2, maxSize = 0, maxQueuedRequests = 10, self.__pendingQueue = multiprocessing.Queue( self.__maxQueuedRequests ) self.__resultsQueue = multiprocessing.Queue( 0 ) self.__prListLock = threading.Lock() - self.__workingProcessList = [] + + self.__workersDict = {} + self.__draining = False self.__bullet = BulletTask() self.__bulletCounter = 0 @@ -482,9 +479,7 @@ def getNumWorkingProcesses( self ): counter = 0 self.__prListLock.acquire() try: - for proc in self.__workingProcessList: - if proc.isWorking(): - counter += 1 + counter = len( [ pid for pid, worker in self.__workersDict.items() if worker.isWorking() ] ) finally: self.__prListLock.release() return counter @@ -497,9 +492,7 @@ def getNumIdleProcesses( self ): counter = 0 self.__prListLock.acquire() try: - for proc in self.__workingProcessList: - if not proc.isWorking(): - counter += 1 + counter = len( [ pid for pid, worker in self.__workersDict.items() if not worker.isWorking() ] ) finally: self.__prListLock.release() return counter @@ -516,21 +509,28 @@ def __spawnWorkingProcess( self ): :param self: self reference """ - self.__workingProcessList.append( WorkingProcess( self.__pendingQueue, self.__resultsQueue ) ) + self.__prListLock.acquire() + try: + worker = WorkingProcess( self.__pendingQueue, self.__resultsQueue ) + while worker.pid == None: + time.sleep(0.1) + self.__workersDict[ worker.pid ] = worker + finally: + self.__prListLock.release() def __killWorkingProcess( self ): """ suspend execution of WorkingProcesses exceeding queue limits :param self: self reference """ - try: - self.__pendingQueue.put( self.__bullet, block = True ) - except Queue.Full: - return S_ERROR( "Queue is full" ) self.__prListLock.acquire() try: self.__bulletCounter += 1 + self.__pendingQueue.put( self.__bullet, block = True ) + except Queue.Full: + self.__bulletCounter -= 1 finally: self.__prListLock.release() + self.__cleanDeadProcesses() def __cleanDeadProcesses( self ): @@ -538,13 +538,10 @@ def __cleanDeadProcesses( self ): ## check wounded processes self.__prListLock.acquire() try: - stillAlive = [] - for workingProcess in self.__workingProcessList: - if workingProcess.is_alive(): - stillAlive.append( workingProcess ) - else: + for pid, worker in self.__workersDict.items(): + if not worker.is_alive(): self.__bulletCounter -= 1 - self.__workingProcessList = stillAlive + del self.__workersDict[pid] finally: self.__prListLock.release() @@ -557,12 +554,12 @@ def __spawnNeededWorkingProcesses( self ): # If we are draining do not spawn processes if self.__draining: return - while len( self.__workingProcessList ) < self.__minSize: + while len( self.__workersDict ) < self.__minSize: self.__spawnWorkingProcess() while self.hasPendingTasks() and \ self.getNumIdleProcesses() == 0 and \ - len( self.__workingProcessList ) < self.__maxSize: + len( self.__workersDict ) < self.__maxSize: self.__spawnWorkingProcess() time.sleep( 0.1 ) @@ -572,7 +569,7 @@ def __killExceedingWorkingProcesses( self ): :param self: self reference """ self.__cleanDeadProcesses() - toKill = max( len( self.__workingProcessList ) - self.__maxSize, 0 ) + toKill = max( len( self.__workersDict ) - self.__maxSize, 0 ) while toKill: self.__killWorkingProcess() toKill = toKill - 1 @@ -588,19 +585,24 @@ def queueTask( self, task, blocking = True, usePoolCallbacks= False ): :param ProcessTask task: new task to execute :param bool blocking: flag to block if necessary and new empty slot is available (default = block) :param bool usePoolCallbacks: flag to trigger execution of pool callbacks (default = don't execute) - """ if not isinstance( task, ProcessTask ): raise TypeError( "Tasks added to the process pool must be ProcessTask instances" ) + if usePoolCallbacks and ( self.__poolCallback or self.__poolExceptionCallback ): + task.enablePoolCallbacks() + + self.__prListLock.acquire() try: - if usePoolCallbacks and ( self.__poolCallback or self.__poolExceptionCallback ): - task.enablePoolCallbacks() self.__pendingQueue.put( task, block = blocking ) except Queue.Full: + self.__prListLock.release() return S_ERROR( "Queue is full" ) + finally: + self.__prListLock.release() + self.__spawnNeededWorkingProcesses() # Throttle a bit to allow task state propagation - time.sleep( 0.01 ) + time.sleep( 0.1 ) return S_OK() def createAndQueueTask( self, @@ -697,18 +699,18 @@ def finalize( self, timeout = 10 ): :param self: self reference :param timeout: seconds to wait before killing """ - #Process all tasks + # Process all tasks self.processAllResults() - #Drain via bullets processes + # Drain via bullets processes self.__draining = True try: - bullets = len( self.__workingProcessList ) - self.__bulletCounter + bullets = len( self.__workersDict ) - self.__bulletCounter while bullets: self.__killWorkingProcess() bullets = bullets - 1 start = time.time() self.__cleanDeadProcesses() - while len( self.__workingProcessList ) > 0: + while len( self.__workersDict ) > 0: if timeout <= 0 or time.time() - start >= timeout: break time.sleep( 0.1 ) @@ -717,9 +719,10 @@ def finalize( self, timeout = 10 ): pass #self.__draining = False # terminate them as it should be done - for wp in self.__workingProcessList: - if wp.is_alive(): - wp.terminate() + for worker in self.__workersDict.values(): + if worker.is_alive(): + worker.terminate() + worker.join() self.__cleanDeadProcesses() # Kill 'em all!! self.__filicide() @@ -727,17 +730,13 @@ def finalize( self, timeout = 10 ): def __filicide( self ): """ Kill all children (processes :P) Kill'em all! ...and justice for all! """ - wpL = [ ( wp, 0 ) for wp in self.__workingProcessList ] - self.__workingProcessList = [] - while wpL: - wp, count = wpL.pop( 0 ) - if wp.pid == None: - if count > 5: - wp.terminate() - else: - wpL.append( ( wp, count + 1 ) ) - continue - os.kill( wp.pid, signal.SIGKILL ) + while self.__workersDict: + pid = self.__workersDict.keys().pop(0) + worker = self.__workersDict[pid] + if worker.is_alive(): + os.kill( worker.pid(), signal.SIGKILL ) + del self.__workersDict[pid] + def daemonize( self ): """ Make ProcessPool a finite being for opening and closing doors between chambers. diff --git a/DataManagementSystem/Agent/TransferAgent.py b/DataManagementSystem/Agent/TransferAgent.py index 46065c8f6f2..d4c84ee90b7 100755 --- a/DataManagementSystem/Agent/TransferAgent.py +++ b/DataManagementSystem/Agent/TransferAgent.py @@ -598,13 +598,6 @@ def executeFTS( self, requestDict ): ownerGroup["Value"] ) ) return S_OK( False ) - ## check request owner - #ownerDN = requestObj.getAttribute( "OwnerDN" ) - #if ownerDN["OK"] and ownerDN["Value"]: - # self.log.info("excuteFTS: request %s has its owner %s, can't use FTS" % ( requestDict["requestName"], - # ownerDN["Value"] ) ) - # return S_OK( False ) - ## check operation res = requestObj.getNumSubRequests( "transfer" ) if not res["OK"]: @@ -679,7 +672,7 @@ def schedule( self, requestDict ): requestDict = { "requestString" : str, "requestName" : str, "sourceServer" : str, - "executionOrder" : list, + "executionOrder" : int, "jobID" : int, "requestObj" : RequestContainer } @@ -695,14 +688,22 @@ def schedule( self, requestDict ): self.log.error( "schedule: Failed to get number of 'transfer' subrequests", res["Message"] ) return S_ERROR( "schedule: Failed to get number of 'transfer' subrequests" ) numberRequests = res["Value"] - self.log.info( "schedule: '%s' found with %s 'transfer' subrequest(s)" % ( requestName, - numberRequests ) ) + self.log.info( "schedule: request '%s'has got %s 'transfer' subrequest(s)" % ( requestName, + numberRequests ) ) for iSubRequest in range( numberRequests ): - self.log.info( "schedule: treating subrequest %s from '%s'." % ( iSubRequest, + self.log.info( "schedule: treating subrequest %s from '%s'" % ( iSubRequest, requestName ) ) subAttrs = requestObj.getSubRequestAttributes( iSubRequest, "transfer" )["Value"] + subRequestStatus = subAttrs["Status"] + #executionOrder = int(subAttrs["ExecutionOrder"]) if "ExecutionOrder" in subAttrs else 0 + #if executionOrder > requestDict["executionOrder"]: + # self.info.warn("schedule: skipping %s subrequest, executionOrder %s > request's executionOrder " % ( iSubRequest, + # executionOrder, + # requestDict["executionOrder"] ) ) + # continue + if subRequestStatus != "Waiting" : ## sub-request is already in terminal state self.log.info( "schedule: subrequest %s status is '%s', it won't be executed" % ( iSubRequest, @@ -792,8 +793,7 @@ def scheduleFiles( self, requestObj, index, subAttrs ): :param RequestContainer requestObj: request being processed :param dict subAttrs: subrequest's attributes """ - self.log.info( "scheduleFiles: *** FTS scheduling ***") - self.log.info( "scheduleFiles: processing subrequest %s" % index ) + self.log.info( "scheduleFiles: FTS scheduling, processing subrequest %s" % index ) ## get source SE sourceSE = subAttrs["SourceSE"] if subAttrs["SourceSE"] not in ( None, "None", "" ) else None ## get target SEs, no matter what's a type we need a list @@ -808,7 +808,7 @@ def scheduleFiles( self, requestObj, index, subAttrs ): if not subRequestFiles["OK"]: return subRequestFiles subRequestFiles = subRequestFiles["Value"] - self.log.info( "scheduleFiles: found %s files" % len( subRequestFiles ) ) + self.log.debug( "scheduleFiles: found %s files" % len( subRequestFiles ) ) ## collect not done LFNS notDoneLFNs = [] for subRequestFile in subRequestFiles: @@ -817,7 +817,7 @@ def scheduleFiles( self, requestObj, index, subAttrs ): notDoneLFNs.append( subRequestFile["LFN"] ) ## get subrequest files - self.log.info( "scheduleFiles: obtaining 'Waiting' files for %d subrequest" % index ) + self.log.debug( "scheduleFiles: obtaining 'Waiting' files for %d subrequest" % index ) files = self.collectFiles( requestObj, index, status = "Waiting" ) if not files["OK"]: self.log.debug("scheduleFiles: failed to get 'Waiting' files from subrequest", files["Message"] ) @@ -826,7 +826,7 @@ def scheduleFiles( self, requestObj, index, subAttrs ): waitingFiles, replicas, metadata = files["Value"] if not waitingFiles: - self.log.info("scheduleFiles: not 'Waiting' files found in this subrequest" ) + self.log.debug("scheduleFiles: not 'Waiting' files found in this subrequest" ) return S_OK( requestObj ) if not replicas or not metadata: @@ -1002,8 +1002,7 @@ def registerFiles( self, requestObj, index ): :param RequestContainer requestObj: request being processed :param dict subAttrs: subrequest's attributes """ - self.log.info( "registerFiles: *** failover registration *** ") - self.log.info( "registerFiles: obtaining all files in %d subrequest" % index ) + self.log.info( "registerFiles: failover registration, processing %s subrequest" % index ) subRequestFiles = requestObj.getSubRequestFiles( index, "transfer" ) if not subRequestFiles["OK"]: return subRequestFiles @@ -1169,7 +1168,7 @@ def determineReplicationTree( self, sourceSE, targetSEs, replicas, size, strateg self.sigma = float(strategy.split("_")[1]) self.log.debug("determineReplicationTree: new sigma %s" % self.sigma ) except ValueError: - self.log.warn("determineRepliactionTree: can't set new sigma value from '%s'" % strategy ) + self.log.warn("determineReplicationTree: can't set new sigma value from '%s'" % strategy ) if reStrategy.pattern in [ "MinimiseTotalWait", "DynamicThroughput" ]: replicasToUse = replicas.keys() if sourceSE == None else [ sourceSE ] tree = self.strategyDispatcher[ reStrategy ].__call__( replicasToUse, targetSEs ) diff --git a/DataManagementSystem/private/RequestAgentBase.py b/DataManagementSystem/private/RequestAgentBase.py index 5dfeb2ab1f5..243ac237637 100644 --- a/DataManagementSystem/private/RequestAgentBase.py +++ b/DataManagementSystem/private/RequestAgentBase.py @@ -135,6 +135,7 @@ def resetRequests( self ): :param self: self reference """ + self.info("resetRequest: will put %s back requests" % len(self.__requestHolder) ) for requestName, requestTuple in self.__requestHolder.items(): requestString, requestServer = requestTuple reset = self.requestClient().updateRequest( requestName, requestString, requestServer ) @@ -321,6 +322,8 @@ def execute( self ): requestDict = requestDict["Value"] requestDict["configPath"] = self.__configPath taskID = requestDict["requestName"] + self.log.info( "processPool tasks idle = %s working = %s" % ( self.processPool().getNumIdleProcesses(), + self.processPool().getNumWorkingProcesses() ) ) while True: if not self.processPool().getFreeSlots(): self.log.info("No free slots available in processPool, will wait a second to proceed...") @@ -342,17 +345,17 @@ def execute( self ): ## task created, a little time kick to proceed time.sleep( 0.1 ) break + return S_OK() def finalize( self ): - """ clean ending of one cycle execution + """ clean ending of maxcycle execution :param self: self reference """ ## finalize all processing if self.hasProcessPool(): - self.processPool().processAllResults() self.processPool().finalize() ## reset failover requests for further processing self.resetRequests() diff --git a/Resources/Catalog/FileCatalogFactory.py b/Resources/Catalog/FileCatalogFactory.py index 55d07ee2c49..f1c74c7d4ba 100644 --- a/Resources/Catalog/FileCatalogFactory.py +++ b/Resources/Catalog/FileCatalogFactory.py @@ -64,7 +64,7 @@ def createCatalog( self, catalogName ): errStr = "Failed to instantiate catalog plug in" gLogger.error( errStr, moduleName ) return S_ERROR( errStr ) - self.log.info('Loaded module %sClient from %s' % ( catalogType, moduleRootPath ) ) + self.log.debug('Loaded module %sClient from %s' % ( catalogType, moduleRootPath ) ) return S_OK( catalog ) except Exception, x: errStr = "Failed to instantiate %s()" % ( moduleName ) @@ -73,4 +73,4 @@ def createCatalog( self, catalogName ): # Catalog module was not loaded return S_ERROR('No suitable client found for %s' % catalogName) - \ No newline at end of file +