From 0a65f4a7171bb29541f9e1a2afc535f880d4877d Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Tue, 24 Apr 2012 10:29:12 +0200 Subject: [PATCH 01/14] RequestAgentBase: change self.info to self.log.info in resetRequest, ProcessPool: new methods: stop- and StartProcessing --- Core/Utilities/ProcessPool.py | 14 ++++++++++++++ DataManagementSystem/private/RequestAgentBase.py | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/Core/Utilities/ProcessPool.py b/Core/Utilities/ProcessPool.py index 694c1ffbabe..9d278514381 100644 --- a/Core/Utilities/ProcessPool.py +++ b/Core/Utilities/ProcessPool.py @@ -445,6 +445,20 @@ def __init__( self, minSize = 2, maxSize = 0, maxQueuedRequests = 10, self.__killIdle = [] self.__killPeriodStart = time.time() + def stopProcessing( self ): + """ case fire + + :param self: self reference + """ + self.finalize() + + def startProcessing( self ): + """ restrat processing again + + :param self: self reference + """ + self.__draining = False + def setPoolCallback( self, callback ): """ set ProcessPool callback function diff --git a/DataManagementSystem/private/RequestAgentBase.py b/DataManagementSystem/private/RequestAgentBase.py index 243ac237637..1616b98eb4d 100644 --- a/DataManagementSystem/private/RequestAgentBase.py +++ b/DataManagementSystem/private/RequestAgentBase.py @@ -135,7 +135,7 @@ def resetRequests( self ): :param self: self reference """ - self.info("resetRequest: will put %s back requests" % len(self.__requestHolder) ) + self.log.info("resetRequest: will put %s back requests" % len(self.__requestHolder) ) for requestName, requestTuple in self.__requestHolder.items(): requestString, requestServer = requestTuple reset = self.requestClient().updateRequest( requestName, requestString, requestServer ) From 8fbfebfd5995cd3429a5dbbf7f4e0789f9c0c13a Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Tue, 24 Apr 2012 16:01:52 +0200 Subject: [PATCH 02/14] ProcessPool: __del__ slot, SRM2Storage: add pipes to putFile op --- Resources/Storage/SRM2Storage.py | 56 +++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index 49b72565e1f..cbb21afa391 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -1,7 +1,20 @@ +################################################################################ +# $HeadURL$ +################################################################################ + """ This is the SRM2 StorageClass """ __RCSID__ = "$Id$" +## imports +import os +import re +import time +import tempfile +import command +from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE +from types import ListType, StringTypes, StringType, DictType, IntType +## from DIRAC from DIRAC import gLogger, gConfig, S_OK, S_ERROR from DIRAC.Resources.Storage.StorageBase import StorageBase from DIRAC.Core.Security.ProxyInfo import getProxyInfo @@ -10,13 +23,16 @@ from DIRAC.Core.Utilities.Pfn import pfnparse, pfnunparse from DIRAC.Core.Utilities.List import breakListIntoChunks from DIRAC.Core.Utilities.File import getSize +from DIRAC.Core.Utilities.Subprocess import shellCall from DIRAC.AccountingSystem.Client.Types.DataOperation import DataOperation from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient -from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE -import types, re, os, time - +################################################################################ class SRM2Storage( StorageBase ): + """ + .. class:: SRM2Storage + + """ def __init__( self, storageName, protocol, path, host, port, spaceToken, wspath ): self.isok = True @@ -311,9 +327,9 @@ def getTransportURL( self, path, protocols = False ): if not protocols['OK']: return protocols listProtocols = protocols['Value'] - elif type( protocols ) == types.StringType: + elif type( protocols ) == StringType: listProtocols = [protocols] - elif type( protocols ) == types.ListType: + elif type( protocols ) == ListType: listProtocols = protocols else: return S_ERROR( "SRM2Storage.getTransportURL: Must supply desired protocols to this plug-in." ) @@ -688,8 +704,26 @@ def __putFile( self, src_file, dest_url, sourceSize ): nbstreams = 1 gLogger.info( "SRM2Storage.__putFile: Using %d streams" % nbstreams ) gLogger.info( "SRM2Storage.__putFile: Executing transfer of %s to %s" % ( src_url, dest_url ) ) + + ## create pipe + pipeName = "%s/%s" % ( tempfile.mktemp(), os.path.basename( src_file ) ) + useFIFO = False + try: + os.mkfifo( pipeName ) + ret = shellCall( "dd if=%s of=%s bs=%s" % ( fifoName, src_file, "32M" ) ) + useFIFO = True + except OSError, error: + gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) ) + + if useFIFO: + src_url = "file:%s" % pipeName + res = pythonCall( ( timeout + 10 ), self.__lcg_cp_wrapper, src_url, dest_url, srctype, dsttype, nbstreams, timeout, src_spacetokendesc, dest_spacetokendesc ) + + if useFIFO: + os.unlink( pipeName ) + if not res['OK']: # Remove the failed replica, just in case result = self.__executeOperation( dest_url, 'removeFile' ) @@ -737,16 +771,16 @@ def __lcg_cp_wrapper( self, src_url, dest_url, srctype, dsttype, nbstreams, dsttype, self.nobdii, self.voName, nbstreams, self.conf_file, self.insecure, self.verbose, timeout, src_spacetokendesc, dest_spacetokendesc ) - if type( errCode ) not in [types.IntType]: + if type( errCode ) != IntType: gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was not an integer", "%s %s" % ( errCode, type( errCode ) ) ) - if type( errCode ) in [types.ListType]: + if type( errCode ) != ListType: msg = [] for err in errCode: msg.append( '%s of type %s' % ( err, type( err ) ) ) gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was List:\n" , "\n".join( msg ) ) return S_ERROR( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was not an integer" ) - if type( errStr ) not in types.StringTypes: + if type( errStr ) not in StringTypes: gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errStr was not a string", "%s %s" % ( errCode, type( errStr ) ) ) return S_ERROR( "SRM2Storage.__lcg_cp_wrapper: Returned errStr was not a string" ) @@ -1317,13 +1351,13 @@ def __removeSubDirectories( self, subDirectories ): return resDict def checkArgumentFormat( self, path ): - if type( path ) in types.StringTypes: + if type( path ) in StringTypes: urls = {path:False} - elif type( path ) == types.ListType: + elif type( path ) == ListType: urls = {} for url in path: urls[url] = False - elif type( path ) == types.DictType: + elif type( path ) == DictType: urls = path else: return S_ERROR( "SRM2Storage.checkArgumentFormat: Supplied path is not of the correct format." ) From be5a90f0ba2c02806ddf6cd0e99c3d93f88d3bf9 Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Tue, 24 Apr 2012 16:39:20 +0200 Subject: [PATCH 03/14] del ProcessPool.__del__, it's already in --- Core/Utilities/ProcessPool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Core/Utilities/ProcessPool.py b/Core/Utilities/ProcessPool.py index 9d278514381..8b45c1116c8 100644 --- a/Core/Utilities/ProcessPool.py +++ b/Core/Utilities/ProcessPool.py @@ -765,7 +765,7 @@ def __filicide( self ): def daemonize( self ): """ Make ProcessPool a finite being for opening and closing doors between chambers. Also just run it in a separate background thread to the death of PID 0. - + :param self: self reference """ if self.__daemonProcess: @@ -776,7 +776,7 @@ def daemonize( self ): def __backgroundProcess( self ): """ daemon thread target - + :param self: self reference """ while True: From 5400f1bc66192625881af174b68e9cfe821b2c6c Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Tue, 24 Apr 2012 16:48:43 +0200 Subject: [PATCH 04/14] dd if of --- Resources/Storage/SRM2Storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index cbb21afa391..bba4b9ffc57 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -710,7 +710,7 @@ def __putFile( self, src_file, dest_url, sourceSize ): useFIFO = False try: os.mkfifo( pipeName ) - ret = shellCall( "dd if=%s of=%s bs=%s" % ( fifoName, src_file, "32M" ) ) + ret = shellCall( "dd if=%s of=%s bs=%s" % ( src_file, pipeName, "32M" ) ) useFIFO = True except OSError, error: gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) ) From 7ff435550ad04cc157dd37a5dc281d5fca9e6934 Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Wed, 25 Apr 2012 09:21:16 +0200 Subject: [PATCH 05/14] small fix --- Resources/Storage/SRM2Storage.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index bba4b9ffc57..2030df82e91 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -11,7 +11,6 @@ import re import time import tempfile -import command from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE from types import ListType, StringTypes, StringType, DictType, IntType ## from DIRAC @@ -710,8 +709,10 @@ def __putFile( self, src_file, dest_url, sourceSize ): useFIFO = False try: os.mkfifo( pipeName ) - ret = shellCall( "dd if=%s of=%s bs=%s" % ( src_file, pipeName, "32M" ) ) - useFIFO = True + ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%s" % ( src_file, pipeName, "32M" ), timeout = 10 ) + if ret["OK"]: + useFIFO = True + gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName ) except OSError, error: gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) ) From ccc93ac84266828d86a8bbaea1141e4cc92bba1f Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Wed, 25 Apr 2012 14:47:23 +0200 Subject: [PATCH 06/14] ProcessPool: worker will commit suicide when parent is dead --- Core/Utilities/ProcessPool.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Core/Utilities/ProcessPool.py b/Core/Utilities/ProcessPool.py index 8b45c1116c8..c77b24655a5 100644 --- a/Core/Utilities/ProcessPool.py +++ b/Core/Utilities/ProcessPool.py @@ -170,7 +170,11 @@ def run( self ): lr = LockRing() lr._openAll() lr._setAllEvents() + while True: + ## commit suicide when parent has died + if os.getppid() == 1: + return try: task = self.__pendingQueue.get( block = True, timeout = 10 ) except Queue.Empty: @@ -181,7 +185,7 @@ def run( self ): try: task.process() if task.hasCallback() or task.usePoolCallbacks(): - self.__resultsQueue.put( task, block = True ) + self.__resultsQueue.put( task, block = True, timeout = 60 ) finally: self.__working.value = 0 From 785ef8d3aeba686c0dccdd0c249995cce8296ff1 Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Wed, 25 Apr 2012 15:20:10 +0200 Subject: [PATCH 07/14] SRM2Storage: small fix in temp pipe name --- Resources/Storage/SRM2Storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index 2030df82e91..d30e6d109b3 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -705,7 +705,7 @@ def __putFile( self, src_file, dest_url, sourceSize ): gLogger.info( "SRM2Storage.__putFile: Executing transfer of %s to %s" % ( src_url, dest_url ) ) ## create pipe - pipeName = "%s/%s" % ( tempfile.mktemp(), os.path.basename( src_file ) ) + pipeName = "%s-%s" % ( tempfile.mktemp(), os.path.basename( src_file ) ) useFIFO = False try: os.mkfifo( pipeName ) From 376e39f31ce55f03759af3052467f7881e03ad93 Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Wed, 25 Apr 2012 16:29:38 +0200 Subject: [PATCH 08/14] SRM2Storage: create pipes only for files bigger than 32MB --- Resources/Storage/SRM2Storage.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index d30e6d109b3..1f15bc941c2 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -704,15 +704,16 @@ def __putFile( self, src_file, dest_url, sourceSize ): gLogger.info( "SRM2Storage.__putFile: Using %d streams" % nbstreams ) gLogger.info( "SRM2Storage.__putFile: Executing transfer of %s to %s" % ( src_url, dest_url ) ) - ## create pipe + ## create pipe if file size is bigger than 32MB pipeName = "%s-%s" % ( tempfile.mktemp(), os.path.basename( src_file ) ) useFIFO = False try: - os.mkfifo( pipeName ) - ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%s" % ( src_file, pipeName, "32M" ), timeout = 10 ) - if ret["OK"]: - useFIFO = True - gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName ) + if sourceSize > 33554432: + os.mkfifo( pipeName ) + ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%s &" % ( src_file, pipeName, "32M" ), timeout = 10 ) + if ret["OK"]: + useFIFO = True + gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName ) except OSError, error: gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) ) From 5b5d10c093d8955cfc1dc010b8b1a43ac6e8d18d Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Thu, 26 Apr 2012 12:07:31 +0200 Subject: [PATCH 09/14] reverting changes in ProcessPool, this development is now in DEV-ProcessPool branch --- Core/Utilities/ProcessPool.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/Core/Utilities/ProcessPool.py b/Core/Utilities/ProcessPool.py index c77b24655a5..8b45c1116c8 100644 --- a/Core/Utilities/ProcessPool.py +++ b/Core/Utilities/ProcessPool.py @@ -170,11 +170,7 @@ def run( self ): lr = LockRing() lr._openAll() lr._setAllEvents() - while True: - ## commit suicide when parent has died - if os.getppid() == 1: - return try: task = self.__pendingQueue.get( block = True, timeout = 10 ) except Queue.Empty: @@ -185,7 +181,7 @@ def run( self ): try: task.process() if task.hasCallback() or task.usePoolCallbacks(): - self.__resultsQueue.put( task, block = True, timeout = 60 ) + self.__resultsQueue.put( task, block = True ) finally: self.__working.value = 0 From 596985d93732378eae7d5b947f4803df47938280 Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Thu, 26 Apr 2012 13:56:14 +0200 Subject: [PATCH 10/14] SRM2Storage: remove obsolete useFIFO flag --- Resources/Storage/SRM2Storage.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index 1f15bc941c2..4c3df2bc400 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -706,24 +706,21 @@ def __putFile( self, src_file, dest_url, sourceSize ): ## create pipe if file size is bigger than 32MB pipeName = "%s-%s" % ( tempfile.mktemp(), os.path.basename( src_file ) ) - useFIFO = False try: if sourceSize > 33554432: os.mkfifo( pipeName ) ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%s &" % ( src_file, pipeName, "32M" ), timeout = 10 ) if ret["OK"]: - useFIFO = True gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName ) + src_url = "file:%s" % pipeName except OSError, error: gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) ) - if useFIFO: - src_url = "file:%s" % pipeName - res = pythonCall( ( timeout + 10 ), self.__lcg_cp_wrapper, src_url, dest_url, srctype, dsttype, nbstreams, timeout, src_spacetokendesc, dest_spacetokendesc ) - if useFIFO: + ## remove pipe + if os.path.exists( pipeName ): os.unlink( pipeName ) if not res['OK']: From 1c44c3dd6cb892d2843be4803f0b407d708cc379 Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Thu, 26 Apr 2012 22:22:27 +0200 Subject: [PATCH 11/14] save --- Resources/Storage/SRM2Storage.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index 4c3df2bc400..a3eb4fc00f5 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -47,6 +47,7 @@ def __init__( self, storageName, protocol, path, host, port, spaceToken, wspath self.wspath = wspath self.spaceToken = spaceToken self.cwd = self.path + StorageBase.__init__( self, self.name, self.path ) self.timeout = 100 From 5b06943bcd167eb53b4f82410d370d7abca85073 Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Thu, 26 Apr 2012 22:35:06 +0200 Subject: [PATCH 12/14] reverting PP and RAB --- Core/Utilities/ProcessPool.py | 18 ++---------------- .../private/RequestAgentBase.py | 2 +- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/Core/Utilities/ProcessPool.py b/Core/Utilities/ProcessPool.py index 8b45c1116c8..694c1ffbabe 100644 --- a/Core/Utilities/ProcessPool.py +++ b/Core/Utilities/ProcessPool.py @@ -445,20 +445,6 @@ def __init__( self, minSize = 2, maxSize = 0, maxQueuedRequests = 10, self.__killIdle = [] self.__killPeriodStart = time.time() - def stopProcessing( self ): - """ case fire - - :param self: self reference - """ - self.finalize() - - def startProcessing( self ): - """ restrat processing again - - :param self: self reference - """ - self.__draining = False - def setPoolCallback( self, callback ): """ set ProcessPool callback function @@ -765,7 +751,7 @@ def __filicide( self ): def daemonize( self ): """ Make ProcessPool a finite being for opening and closing doors between chambers. Also just run it in a separate background thread to the death of PID 0. - + :param self: self reference """ if self.__daemonProcess: @@ -776,7 +762,7 @@ def daemonize( self ): def __backgroundProcess( self ): """ daemon thread target - + :param self: self reference """ while True: diff --git a/DataManagementSystem/private/RequestAgentBase.py b/DataManagementSystem/private/RequestAgentBase.py index 1616b98eb4d..243ac237637 100644 --- a/DataManagementSystem/private/RequestAgentBase.py +++ b/DataManagementSystem/private/RequestAgentBase.py @@ -135,7 +135,7 @@ def resetRequests( self ): :param self: self reference """ - self.log.info("resetRequest: will put %s back requests" % len(self.__requestHolder) ) + self.info("resetRequest: will put %s back requests" % len(self.__requestHolder) ) for requestName, requestTuple in self.__requestHolder.items(): requestString, requestServer = requestTuple reset = self.requestClient().updateRequest( requestName, requestString, requestServer ) From f667bfefa8b31ed8f0bebf64f9936566c6b3b2b5 Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Fri, 27 Apr 2012 14:49:44 +0200 Subject: [PATCH 13/14] new CS option for setting dd bs size, default = 32MB --- Resources/Storage/SRM2Storage.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index a3eb4fc00f5..c938cccfe3c 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -53,8 +53,9 @@ def __init__( self, storageName, protocol, path, host, port, spaceToken, wspath self.timeout = 100 self.long_timeout = 1200 self.stageTimeout = gConfig.getValue( '/Resources/StorageElements/StageTimeout', 12 * 60 * 60 ) - self.fileTimeout = gConfig.getValue( '/Resources/StorageElements/FileTimeout', 30 ) + self.fileTimeout = gConfig.getValue( '/Resources/StorageElements/FileTimeout', 30 ) self.filesPerCall = gConfig.getValue( '/Resources/StorageElements/FilesPerCall', 20 ) + self.uploadBlockSizeMB = gConfig.getValue( '/Resources/StorageElements/UploadBlockSizeMB', 32 ) # setting some variables for use with lcg_utils self.nobdii = 1 @@ -710,7 +711,7 @@ def __putFile( self, src_file, dest_url, sourceSize ): try: if sourceSize > 33554432: os.mkfifo( pipeName ) - ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%s &" % ( src_file, pipeName, "32M" ), timeout = 10 ) + ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%sM &" % ( src_file, pipeName, self.uploadBlockSizeMB ), timeout = 10 ) if ret["OK"]: gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName ) src_url = "file:%s" % pipeName From 284548aa966bb7cbb44e18c55c40ada433edf62b Mon Sep 17 00:00:00 2001 From: Krzysztof Ciba Date: Wed, 2 May 2012 13:14:13 +0200 Subject: [PATCH 14/14] SRM2Storage: make sure src_file is local for creating pipe --- Resources/Storage/SRM2Storage.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index c938cccfe3c..b98eee0034f 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -705,18 +705,20 @@ def __putFile( self, src_file, dest_url, sourceSize ): nbstreams = 1 gLogger.info( "SRM2Storage.__putFile: Using %d streams" % nbstreams ) gLogger.info( "SRM2Storage.__putFile: Executing transfer of %s to %s" % ( src_url, dest_url ) ) - - ## create pipe if file size is bigger than 32MB - pipeName = "%s-%s" % ( tempfile.mktemp(), os.path.basename( src_file ) ) - try: - if sourceSize > 33554432: - os.mkfifo( pipeName ) - ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%sM &" % ( src_file, pipeName, self.uploadBlockSizeMB ), timeout = 10 ) - if ret["OK"]: - gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName ) - src_url = "file:%s" % pipeName - except OSError, error: - gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) ) + + ## local file? + if src_url.startswith("file:"): + ## create pipe if file size is bigger than 32MB + pipeName = "%s-%s" % ( tempfile.mktemp(), os.path.basename( src_file ) ) + try: + if sourceSize > 33554432: + os.mkfifo( pipeName ) + ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%sM &" % ( src_file, pipeName, self.uploadBlockSizeMB ), timeout = 10 ) + if ret["OK"]: + gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName ) + src_url = "file:%s" % pipeName + except OSError, error: + gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) ) res = pythonCall( ( timeout + 10 ), self.__lcg_cp_wrapper, src_url, dest_url, srctype, dsttype, nbstreams, timeout, src_spacetokendesc, dest_spacetokendesc )