Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 49 additions & 12 deletions Resources/Storage/SRM2Storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
################################################################################
# $HeadURL$
################################################################################

""" This is the SRM2 StorageClass """

__RCSID__ = "$Id$"

## imports
import os
import re
import time
import tempfile
from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE
from types import ListType, StringTypes, StringType, DictType, IntType
## from DIRAC
from DIRAC import gLogger, gConfig, S_OK, S_ERROR
from DIRAC.Resources.Storage.StorageBase import StorageBase
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
Expand All @@ -10,13 +22,16 @@
from DIRAC.Core.Utilities.Pfn import pfnparse, pfnunparse
from DIRAC.Core.Utilities.List import breakListIntoChunks
from DIRAC.Core.Utilities.File import getSize
from DIRAC.Core.Utilities.Subprocess import shellCall
from DIRAC.AccountingSystem.Client.Types.DataOperation import DataOperation
from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient

from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE
import types, re, os, time

################################################################################
class SRM2Storage( StorageBase ):
"""
.. class:: SRM2Storage

"""

def __init__( self, storageName, protocol, path, host, port, spaceToken, wspath ):
self.isok = True
Expand All @@ -32,13 +47,15 @@ def __init__( self, storageName, protocol, path, host, port, spaceToken, wspath
self.wspath = wspath
self.spaceToken = spaceToken
self.cwd = self.path

StorageBase.__init__( self, self.name, self.path )

self.timeout = 100
self.long_timeout = 1200
self.stageTimeout = gConfig.getValue( '/Resources/StorageElements/StageTimeout', 12 * 60 * 60 )
self.fileTimeout = gConfig.getValue( '/Resources/StorageElements/FileTimeout', 30 )
self.fileTimeout = gConfig.getValue( '/Resources/StorageElements/FileTimeout', 30 )
self.filesPerCall = gConfig.getValue( '/Resources/StorageElements/FilesPerCall', 20 )
self.uploadBlockSizeMB = gConfig.getValue( '/Resources/StorageElements/UploadBlockSizeMB', 32 )

# setting some variables for use with lcg_utils
self.nobdii = 1
Expand Down Expand Up @@ -311,9 +328,9 @@ def getTransportURL( self, path, protocols = False ):
if not protocols['OK']:
return protocols
listProtocols = protocols['Value']
elif type( protocols ) == types.StringType:
elif type( protocols ) == StringType:
listProtocols = [protocols]
elif type( protocols ) == types.ListType:
elif type( protocols ) == ListType:
listProtocols = protocols
else:
return S_ERROR( "SRM2Storage.getTransportURL: Must supply desired protocols to this plug-in." )
Expand Down Expand Up @@ -688,8 +705,28 @@ def __putFile( self, src_file, dest_url, sourceSize ):
nbstreams = 1
gLogger.info( "SRM2Storage.__putFile: Using %d streams" % nbstreams )
gLogger.info( "SRM2Storage.__putFile: Executing transfer of %s to %s" % ( src_url, dest_url ) )

## local file?
if src_url.startswith("file:"):
## create pipe if file size is bigger than 32MB
pipeName = "%s-%s" % ( tempfile.mktemp(), os.path.basename( src_file ) )
try:
if sourceSize > 33554432:
os.mkfifo( pipeName )
ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%sM &" % ( src_file, pipeName, self.uploadBlockSizeMB ), timeout = 10 )
if ret["OK"]:
gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName )
src_url = "file:%s" % pipeName
except OSError, error:
gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) )

res = pythonCall( ( timeout + 10 ), self.__lcg_cp_wrapper, src_url, dest_url,
srctype, dsttype, nbstreams, timeout, src_spacetokendesc, dest_spacetokendesc )

## remove pipe
if os.path.exists( pipeName ):
os.unlink( pipeName )

if not res['OK']:
# Remove the failed replica, just in case
result = self.__executeOperation( dest_url, 'removeFile' )
Expand Down Expand Up @@ -737,16 +774,16 @@ def __lcg_cp_wrapper( self, src_url, dest_url, srctype, dsttype, nbstreams,
dsttype, self.nobdii, self.voName, nbstreams, self.conf_file,
self.insecure, self.verbose, timeout, src_spacetokendesc,
dest_spacetokendesc )
if type( errCode ) not in [types.IntType]:
if type( errCode ) != IntType:
gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was not an integer",
"%s %s" % ( errCode, type( errCode ) ) )
if type( errCode ) in [types.ListType]:
if type( errCode ) != ListType:
msg = []
for err in errCode:
msg.append( '%s of type %s' % ( err, type( err ) ) )
gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was List:\n" , "\n".join( msg ) )
return S_ERROR( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was not an integer" )
if type( errStr ) not in types.StringTypes:
if type( errStr ) not in StringTypes:
gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errStr was not a string",
"%s %s" % ( errCode, type( errStr ) ) )
return S_ERROR( "SRM2Storage.__lcg_cp_wrapper: Returned errStr was not a string" )
Expand Down Expand Up @@ -1317,13 +1354,13 @@ def __removeSubDirectories( self, subDirectories ):
return resDict

def checkArgumentFormat( self, path ):
if type( path ) in types.StringTypes:
if type( path ) in StringTypes:
urls = {path:False}
elif type( path ) == types.ListType:
elif type( path ) == ListType:
urls = {}
for url in path:
urls[url] = False
elif type( path ) == types.DictType:
elif type( path ) == DictType:
urls = path
else:
return S_ERROR( "SRM2Storage.checkArgumentFormat: Supplied path is not of the correct format." )
Expand Down