diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index 49b72565e1f..b98eee0034f 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -1,7 +1,19 @@ +################################################################################ +# $HeadURL$ +################################################################################ + """ This is the SRM2 StorageClass """ __RCSID__ = "$Id$" +## imports +import os +import re +import time +import tempfile +from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE +from types import ListType, StringTypes, StringType, DictType, IntType +## from DIRAC from DIRAC import gLogger, gConfig, S_OK, S_ERROR from DIRAC.Resources.Storage.StorageBase import StorageBase from DIRAC.Core.Security.ProxyInfo import getProxyInfo @@ -10,13 +22,16 @@ from DIRAC.Core.Utilities.Pfn import pfnparse, pfnunparse from DIRAC.Core.Utilities.List import breakListIntoChunks from DIRAC.Core.Utilities.File import getSize +from DIRAC.Core.Utilities.Subprocess import shellCall from DIRAC.AccountingSystem.Client.Types.DataOperation import DataOperation from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient -from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE -import types, re, os, time - +################################################################################ class SRM2Storage( StorageBase ): + """ + .. class:: SRM2Storage + + """ def __init__( self, storageName, protocol, path, host, port, spaceToken, wspath ): self.isok = True @@ -32,13 +47,15 @@ def __init__( self, storageName, protocol, path, host, port, spaceToken, wspath self.wspath = wspath self.spaceToken = spaceToken self.cwd = self.path + StorageBase.__init__( self, self.name, self.path ) self.timeout = 100 self.long_timeout = 1200 self.stageTimeout = gConfig.getValue( '/Resources/StorageElements/StageTimeout', 12 * 60 * 60 ) - self.fileTimeout = gConfig.getValue( '/Resources/StorageElements/FileTimeout', 30 ) + self.fileTimeout = gConfig.getValue( '/Resources/StorageElements/FileTimeout', 30 ) self.filesPerCall = gConfig.getValue( '/Resources/StorageElements/FilesPerCall', 20 ) + self.uploadBlockSizeMB = gConfig.getValue( '/Resources/StorageElements/UploadBlockSizeMB', 32 ) # setting some variables for use with lcg_utils self.nobdii = 1 @@ -311,9 +328,9 @@ def getTransportURL( self, path, protocols = False ): if not protocols['OK']: return protocols listProtocols = protocols['Value'] - elif type( protocols ) == types.StringType: + elif type( protocols ) == StringType: listProtocols = [protocols] - elif type( protocols ) == types.ListType: + elif type( protocols ) == ListType: listProtocols = protocols else: return S_ERROR( "SRM2Storage.getTransportURL: Must supply desired protocols to this plug-in." ) @@ -688,8 +705,28 @@ def __putFile( self, src_file, dest_url, sourceSize ): nbstreams = 1 gLogger.info( "SRM2Storage.__putFile: Using %d streams" % nbstreams ) gLogger.info( "SRM2Storage.__putFile: Executing transfer of %s to %s" % ( src_url, dest_url ) ) + + ## local file? + if src_url.startswith("file:"): + ## create pipe if file size is bigger than 32MB + pipeName = "%s-%s" % ( tempfile.mktemp(), os.path.basename( src_file ) ) + try: + if sourceSize > 33554432: + os.mkfifo( pipeName ) + ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%sM &" % ( src_file, pipeName, self.uploadBlockSizeMB ), timeout = 10 ) + if ret["OK"]: + gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName ) + src_url = "file:%s" % pipeName + except OSError, error: + gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) ) + res = pythonCall( ( timeout + 10 ), self.__lcg_cp_wrapper, src_url, dest_url, srctype, dsttype, nbstreams, timeout, src_spacetokendesc, dest_spacetokendesc ) + + ## remove pipe + if os.path.exists( pipeName ): + os.unlink( pipeName ) + if not res['OK']: # Remove the failed replica, just in case result = self.__executeOperation( dest_url, 'removeFile' ) @@ -737,16 +774,16 @@ def __lcg_cp_wrapper( self, src_url, dest_url, srctype, dsttype, nbstreams, dsttype, self.nobdii, self.voName, nbstreams, self.conf_file, self.insecure, self.verbose, timeout, src_spacetokendesc, dest_spacetokendesc ) - if type( errCode ) not in [types.IntType]: + if type( errCode ) != IntType: gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was not an integer", "%s %s" % ( errCode, type( errCode ) ) ) - if type( errCode ) in [types.ListType]: + if type( errCode ) != ListType: msg = [] for err in errCode: msg.append( '%s of type %s' % ( err, type( err ) ) ) gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was List:\n" , "\n".join( msg ) ) return S_ERROR( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was not an integer" ) - if type( errStr ) not in types.StringTypes: + if type( errStr ) not in StringTypes: gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errStr was not a string", "%s %s" % ( errCode, type( errStr ) ) ) return S_ERROR( "SRM2Storage.__lcg_cp_wrapper: Returned errStr was not a string" ) @@ -1317,13 +1354,13 @@ def __removeSubDirectories( self, subDirectories ): return resDict def checkArgumentFormat( self, path ): - if type( path ) in types.StringTypes: + if type( path ) in StringTypes: urls = {path:False} - elif type( path ) == types.ListType: + elif type( path ) == ListType: urls = {} for url in path: urls[url] = False - elif type( path ) == types.DictType: + elif type( path ) == DictType: urls = path else: return S_ERROR( "SRM2Storage.checkArgumentFormat: Supplied path is not of the correct format." )