diff --git a/Core/Utilities/ProcessPool.py b/Core/Utilities/ProcessPool.py index 1dddd743d85..f00b9422f71 100644 --- a/Core/Utilities/ProcessPool.py +++ b/Core/Utilities/ProcessPool.py @@ -440,6 +440,20 @@ def __init__( self, minSize = 2, maxSize = 0, maxQueuedRequests = 10, self.__bulletCounter = 0 self.__daemonProcess = False self.__spawnNeededWorkingProcesses() + + def stopProcessing( self ): + """ case fire + + :param self: self reference + """ + self.finalize() + + def startProcessing( self ): + """ restrat processing again + + :param self: self reference + """ + self.__draining = False def setPoolCallback( self, callback ): """ set ProcessPool callback function @@ -737,7 +751,6 @@ def __filicide( self ): os.kill( worker.pid(), signal.SIGKILL ) del self.__workersDict[pid] - def daemonize( self ): """ Make ProcessPool a finite being for opening and closing doors between chambers. Also just run it in a separate background thread to the death of PID 0. diff --git a/DataManagementSystem/private/RequestAgentBase.py b/DataManagementSystem/private/RequestAgentBase.py index 243ac237637..1616b98eb4d 100644 --- a/DataManagementSystem/private/RequestAgentBase.py +++ b/DataManagementSystem/private/RequestAgentBase.py @@ -135,7 +135,7 @@ def resetRequests( self ): :param self: self reference """ - self.info("resetRequest: will put %s back requests" % len(self.__requestHolder) ) + self.log.info("resetRequest: will put %s back requests" % len(self.__requestHolder) ) for requestName, requestTuple in self.__requestHolder.items(): requestString, requestServer = requestTuple reset = self.requestClient().updateRequest( requestName, requestString, requestServer ) diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index 49b72565e1f..2030df82e91 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -1,7 +1,19 @@ +################################################################################ +# $HeadURL$ +################################################################################ + """ This is the SRM2 StorageClass """ __RCSID__ = "$Id$" +## imports +import os +import re +import time +import tempfile +from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE +from types import ListType, StringTypes, StringType, DictType, IntType +## from DIRAC from DIRAC import gLogger, gConfig, S_OK, S_ERROR from DIRAC.Resources.Storage.StorageBase import StorageBase from DIRAC.Core.Security.ProxyInfo import getProxyInfo @@ -10,13 +22,16 @@ from DIRAC.Core.Utilities.Pfn import pfnparse, pfnunparse from DIRAC.Core.Utilities.List import breakListIntoChunks from DIRAC.Core.Utilities.File import getSize +from DIRAC.Core.Utilities.Subprocess import shellCall from DIRAC.AccountingSystem.Client.Types.DataOperation import DataOperation from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient -from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE -import types, re, os, time - +################################################################################ class SRM2Storage( StorageBase ): + """ + .. class:: SRM2Storage + + """ def __init__( self, storageName, protocol, path, host, port, spaceToken, wspath ): self.isok = True @@ -311,9 +326,9 @@ def getTransportURL( self, path, protocols = False ): if not protocols['OK']: return protocols listProtocols = protocols['Value'] - elif type( protocols ) == types.StringType: + elif type( protocols ) == StringType: listProtocols = [protocols] - elif type( protocols ) == types.ListType: + elif type( protocols ) == ListType: listProtocols = protocols else: return S_ERROR( "SRM2Storage.getTransportURL: Must supply desired protocols to this plug-in." ) @@ -688,8 +703,28 @@ def __putFile( self, src_file, dest_url, sourceSize ): nbstreams = 1 gLogger.info( "SRM2Storage.__putFile: Using %d streams" % nbstreams ) gLogger.info( "SRM2Storage.__putFile: Executing transfer of %s to %s" % ( src_url, dest_url ) ) + + ## create pipe + pipeName = "%s/%s" % ( tempfile.mktemp(), os.path.basename( src_file ) ) + useFIFO = False + try: + os.mkfifo( pipeName ) + ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%s" % ( src_file, pipeName, "32M" ), timeout = 10 ) + if ret["OK"]: + useFIFO = True + gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName ) + except OSError, error: + gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) ) + + if useFIFO: + src_url = "file:%s" % pipeName + res = pythonCall( ( timeout + 10 ), self.__lcg_cp_wrapper, src_url, dest_url, srctype, dsttype, nbstreams, timeout, src_spacetokendesc, dest_spacetokendesc ) + + if useFIFO: + os.unlink( pipeName ) + if not res['OK']: # Remove the failed replica, just in case result = self.__executeOperation( dest_url, 'removeFile' ) @@ -737,16 +772,16 @@ def __lcg_cp_wrapper( self, src_url, dest_url, srctype, dsttype, nbstreams, dsttype, self.nobdii, self.voName, nbstreams, self.conf_file, self.insecure, self.verbose, timeout, src_spacetokendesc, dest_spacetokendesc ) - if type( errCode ) not in [types.IntType]: + if type( errCode ) != IntType: gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was not an integer", "%s %s" % ( errCode, type( errCode ) ) ) - if type( errCode ) in [types.ListType]: + if type( errCode ) != ListType: msg = [] for err in errCode: msg.append( '%s of type %s' % ( err, type( err ) ) ) gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was List:\n" , "\n".join( msg ) ) return S_ERROR( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was not an integer" ) - if type( errStr ) not in types.StringTypes: + if type( errStr ) not in StringTypes: gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errStr was not a string", "%s %s" % ( errCode, type( errStr ) ) ) return S_ERROR( "SRM2Storage.__lcg_cp_wrapper: Returned errStr was not a string" ) @@ -1317,13 +1352,13 @@ def __removeSubDirectories( self, subDirectories ): return resDict def checkArgumentFormat( self, path ): - if type( path ) in types.StringTypes: + if type( path ) in StringTypes: urls = {path:False} - elif type( path ) == types.ListType: + elif type( path ) == ListType: urls = {} for url in path: urls[url] = False - elif type( path ) == types.DictType: + elif type( path ) == DictType: urls = path else: return S_ERROR( "SRM2Storage.checkArgumentFormat: Supplied path is not of the correct format." )