Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,630 changes: 900 additions & 730 deletions DataManagementSystem/Client/ReplicaManager.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
########################################################################
# $HeadURL: $
########################################################################
__RCSID__ = "$Id: $"
__RCSID__ = "$Id$"

from DIRAC.Core.Base import Script

Expand All @@ -16,16 +16,16 @@
Script.parseCommandLine()

import DIRAC
from DIRAC import gLogger,S_OK,S_ERROR
from DIRAC.Core.Utilities.List import sortList
from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Core.Utilities.List import sortList
from LHCbDIRAC.DataManagementSystem.Client.DataIntegrityClient import DataIntegrityClient
integrityClient = DataIntegrityClient()

def resolveTransforamtionProblematics(transID):
def resolveTransforamtionProblematics( transID ):
gLogger.notice("Obtaining problematic files for transformation %d" % transID)
res = integrityClient.getTransformationProblematics(transID)
if not res['OK']:
gLogger.error("Failed to get transformation problematic files",res['Message'])
gLogger.error("Failed to get transformation problematic files", res['Message'])
return S_ERROR()
problematicFiles = res['Value']
if not problematicFiles:
Expand All @@ -34,15 +34,20 @@ def resolveTransforamtionProblematics(transID):
for lfn in sortList(problematicFiles.keys()):
prognosis = problematicFiles[lfn]['Prognosis']
problematicDict = problematicFiles[lfn]
execString = "res = integrityClient.resolve%s(problematicDict)" % prognosis
try:
exec(execString)
except AttributeError:
gLogger.error("Resolution method for %s not available" % prognosis)
gLogger.notice("Prognosis is %s" % prognosis )
if not hasattr( integrityClient, methodToCall ):
gLogger.notice( "DataIntegrityClient hasn't got '%s' member" % methodToCall )
continue
fcn = getattr( integrityClient, methodToCall )
if not callable( fcn ):
gLogger.notice( "DataIntegrityClient member '%s' isn't a method" % methodToCall )
continue
## results not checked??? Where is The Food?
res = fcn( problematicDict )
gLogger.notice("Problematic files resolved for transformation %d" % transID)
return S_OK()

transIDs = [int(x) for x in Script.getPositionalArgs()]
transIDs = [ int(x) for x in Script.getPositionalArgs() ]

if not transIDs:
gLogger.notice("Please supply transformationIDs as arguments")
Expand Down
28 changes: 13 additions & 15 deletions Resources/Catalog/LcgFileCatalogClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,21 +1072,19 @@ def __checkArgumentFormat( self, path ):
def __executeOperation( self, path, method ):
""" Executes the requested functionality with the supplied path
"""
execString = "res = self.%s(path)" % method
try:
exec( execString )
if type( path ) == types.DictType:
path = path.keys()[0]
if not res['OK']:
return res
elif not res['Value']['Successful'].has_key( path ):
return S_ERROR( res['Value']['Failed'][path] )
else:
return S_OK( res['Value']['Successful'][path] )
except AttributeError, errMessage:
exceptStr = "LcgFileCatalogClient.__executeOperation: Exception while perfoming %s." % method
gLogger.exception( exceptStr, '', errMessage )
return S_ERROR( "%s%s" % ( exceptStr, errMessage ) )
fcn = None
if hasattr( self, method ) and callable( getattr(self, method) ):
fcn = getattr( self, method )
if not fcn:
return S_ERROR("Unable to invoke %s, it isn't a member function of LcgFileCatalogClient" % method )
res = fcn( path )
if type( path ) == types.DictType:
path = path.keys()[0]
if not res['OK']:
return res
elif path not in res['Value']['Successful']:
return S_ERROR( res['Value']['Failed'][path] )
return S_OK( res['Value']['Successful'][path] )

def __getLFNForPFN( self, pfn ):
fstat = lfc.lfc_filestatg()
Expand Down
28 changes: 15 additions & 13 deletions Resources/Storage/DIPStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,16 +589,18 @@ def __checkArgumentFormatDict( self, path ):
def __executeOperation( self, url, method ):
""" Executes the requested functionality with the supplied url
"""
execString = "res = self.%s(url)" % method
try:
exec( execString )
if not res['OK']:
return S_ERROR( res['Message'] )
elif not res['Value']['Successful'].has_key( url ):
return S_ERROR( res['Value']['Failed'][url] )
else:
return S_OK( res['Value']['Successful'][url] )
except AttributeError, errMessage:
exceptStr = "DIPStorage.__executeOperation: Exception while perfoming %s." % method
gLogger.exception( exceptStr, '', errMessage )
return S_ERROR( "%s%s" % ( exceptStr, errMessage ) )
fcn = None
if hasattr( self, method ) and callable( getattr( self, method ) ):
fcn = getattr( self, method )
if not fcn:
return S_ERROR("Unable to invoke %s, it isn't a member function of DIPStorage" % method )

res = fcn( url )
if not res['OK']:
return res
elif url not in res['Value']['Successful']:
return S_ERROR( res['Value']['Failed'][url] )

return S_OK( res['Value']['Successful'][url] )


30 changes: 16 additions & 14 deletions Resources/Storage/ProxyStorage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
""" This is the Proxy storage element client """
#################################################################################
# $HeadURL $
#################################################################################

""" This is the Proxy storage element client """

__RCSID__ = "$Id$"

Expand Down Expand Up @@ -227,16 +230,15 @@ def __checkArgumentFormatDict( self, path ):
def __executeOperation( self, url, method ):
""" Executes the requested functionality with the supplied url
"""
execString = "res = self.%s(url)" % method
try:
exec( execString )
if not res['OK']:
return S_ERROR( res['Message'] )
elif not res['Value']['Successful'].has_key( url ):
return S_ERROR( res['Value']['Failed'][url] )
else:
return S_OK( res['Value']['Successful'][url] )
except AttributeError, errMessage:
exceptStr = "ProxyStorage.__executeOperation: Exception while perfoming %s." % method
gLogger.exception( exceptStr, '', errMessage )
return S_ERROR( "%s%s" % ( exceptStr, errMessage ) )
fcn = None
if hasattr( self, method ) and callable( getattr( self, method ) ):
fcn = getattr( self, method )
if not fcn:
return S_ERROR("Unable to invoke %s, it isn't a member function of ProxyStorage" % method )
res = fcn( url )
if not res['OK']:
return res
elif url not in res['Value']['Successful']:
return S_ERROR( res['Value']['Failed'][url] )
return S_OK( res['Value']['Successful'][url] )

27 changes: 15 additions & 12 deletions Resources/Storage/RFIOStorage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
###########################################################################
# $HeadURL$
###########################################################################

""" This is the RFIO StorageClass """

__RCSID__ = "$Id$"
Expand Down Expand Up @@ -1077,16 +1081,15 @@ def __checkArgumentFormatDict( self, path ):
def __executeOperation( self, url, method ):
""" Executes the requested functionality with the supplied url
"""
execString = "res = self.%s(url)" % method
try:
exec( execString )
if not res['OK']:
return S_ERROR( res['Message'] )
elif not res['Value']['Successful'].has_key( url ):
fcn = None
if hasattr(self, method) and callable( getattr(self, method) ):
fcn = getattr( self, method )
if not fcn:
return S_ERROR("Unable to invoke %s, it isn't a member funtion of RFIOStorage" % method )
res = fcn( url )
if not res['OK']:
return res
elif url not in res['Value']['Successful']:
return S_ERROR( res['Value']['Failed'][url] )
else:
return S_OK( res['Value']['Successful'][url] )
except AttributeError, errMessage:
exceptStr = "RFIOStorage.__executeOperation: Exception while performing %s." % method
gLogger.exception( exceptStr, '', errMessage )
return S_ERROR( "%s%s" % ( exceptStr, errMessage ) )
return S_OK( res['Value']['Successful'][url] )

79 changes: 37 additions & 42 deletions Resources/Storage/SRM2Storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,28 +827,27 @@ def __getFile( self, src_url, dest_file ):
def __executeOperation( self, url, method ):
""" Executes the requested functionality with the supplied url
"""
execString = "res = self.%s(url)" % method
try:
exec( execString )
if not res['OK']:
return S_ERROR( res['Message'] )
elif not res['Value']['Successful'].has_key( url ):
if not res['Value']['Failed'].has_key( url ):
if res['Value']['Failed'].values():
return S_ERROR( res['Value']['Failed'].values()[0] )
elif res['Value']['Successful'].values():
return S_OK( res['Value']['Successful'].values()[0] )
else:
gLogger.error( 'Wrong Return structure', str( res['Value'] ) )
return S_ERROR( 'Wrong Return structure' )
return S_ERROR( res['Value']['Failed'][url] )
else:
return S_OK( res['Value']['Successful'][url] )
except AttributeError, errMessage:
exceptStr = "SRM2Storage.__executeOperation: Exception while perfoming %s." % method
gLogger.exception( exceptStr, '', errMessage )
return S_ERROR( "%s%s" % ( exceptStr, errMessage ) )
fcn = None
if hasattr( self, method ) and callable( getattr(self, method) ):
fcn = getattr( self, method )
if not fcn:
return S_ERROR("Unable to invoke %s, it isn't a member funtion of SRM2Storage" % method )
res = fcn( url )

if not res['OK']:
return res
elif url not in res['Value']['Successful']:
if url not in res['Value']['Failed']:
if res['Value']['Failed'].values():
return S_ERROR( res['Value']['Failed'].values()[0] )
elif res['Value']['Successful'].values():
return S_OK( res['Value']['Successful'].values()[0] )
else:
gLogger.error( 'Wrong Return structure', str( res['Value'] ) )
return S_ERROR( 'Wrong Return structure' )
return S_ERROR( res['Value']['Failed'][url] )
return S_OK( res['Value']['Successful'][url] )

############################################################################################
#
# Directory based methods
Expand Down Expand Up @@ -1852,27 +1851,23 @@ def __gfal_exec( self, gfalObject, method, timeout_sendreceive = None ):

"""
gLogger.debug( "SRM2Storage.__gfal_exec: Performing %s." % method )
execString = "errCode,gfalObject,errMessage = self.gfal.%s(gfalObject)" % method
try:
if timeout_sendreceive:
# For asynchronous methods this timeout defines how long the connection to set the
# request can take
self.gfal.gfal_set_timeout_sendreceive( timeout_sendreceive )
exec( execString )
if not errCode == 0:
errStr = "SRM2Storage.__gfal_exec: Failed to perform %s." % method
if not errMessage:
errMessage = os.strerror( errCode )
gLogger.error( errStr, errMessage )
return S_ERROR( "%s%s" % ( errStr, errMessage ) )
else:
gLogger.debug( "SRM2Storage.__gfal_exec: Successfully performed %s." % method )
return S_OK( gfalObject )
except AttributeError, errMessage:
exceptStr = "SRM2Storage.__gfal_exec: Exception while perfoming %s." % method
gLogger.exception( exceptStr, '', errMessage )
return S_ERROR( "%s%s" % ( exceptStr, errMessage ) )

fcn = None
if hasattr( self.gfal, method ) and callable( getattr( self.gfal, method) ):
fcn = getattr( self.gfal, method )
if not fcn:
return S_ERROR( "Unable to invoke %s for gfal, it isn't a member function" % method )
if timeout_sendreceive:
self.gfal.gfal_set_timeout_sendreceive( timeout_sendreceive )
errCode, gfalObject, errMessage = fcn( gfalObject )
if errCode:
errStr = "SRM2Storage.__gfal_exec: Failed to perform %s." % method
if not errMessage:
errMessage = os.strerror( errCode )
gLogger.error( errStr, errMessage )
return S_ERROR( "%s%s" % ( errStr, errMessage ) )
gLogger.debug( "SRM2Storage.__gfal_exec: Successfully performed %s." % method )
return S_OK( gfalObject )

# These methods are for retrieving output information

def __get_results( self, gfalObject ):
Expand Down
26 changes: 9 additions & 17 deletions Resources/Storage/StorageElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,26 +607,18 @@ def __executeFunction( self, pfn, method, argsDict = None ):
gLogger.verbose( "StorageElement.__executeFunction No pfns generated for protocol %s." % protocolName )
else:
gLogger.verbose( "StorageElement.__executeFunction: Attempting to perform '%s' for %s physical files." % ( method, len( pfnDict.keys() ) ) )
fcn = None
if hasattr( storage, method ) and callable( getattr( storage, method ) ):
fcn = getattr( storage, method )
if not fcn:
return S_ERROR("StorageElement.__executeFunction: unable to invoke %s, it isn't a member function of storage")

pfnsToUse = {}
for pfn in pfnDict.keys():
pfnsToUse[pfn] = pfns[pfnDict[pfn]]
if argsDict:
execString = "res = storage.%s(pfnsToUse" % method
for argument, value in argsDict.items():
if type( value ) == types.StringType:
execString = "%s, %s='%s'" % ( execString, argument, value )
else:
execString = "%s, %s=%s" % ( execString, argument, value )
execString = "%s)" % execString
else:
execString = "res = storage.%s(pfnsToUse)" % method
try:
exec( execString )
except AttributeError, errMessage:
exceptStr = "StorageElement.__executeFunction: Exception while performing %s." % method
gLogger.exception( exceptStr, str( errMessage ) )
res = S_ERROR( exceptStr )


res = fcn( pfnsToUse, **argsDict )

if not res['OK']:
errStr = "StorageElement.__executeFunction: Completely failed to perform %s." % method
gLogger.error( errStr, '%s for protocol %s: %s' % ( self.name, protocolName, res['Message'] ) )
Expand Down
10 changes: 8 additions & 2 deletions StorageManagementSystem/Agent/RequestFinalizationAgent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
##########################################################################
# $HeadURL$
##########################################################################

__RCSID__ = "$Id$"

Expand Down Expand Up @@ -111,9 +113,13 @@ def __performCallback( self, status, callback, sourceTask ):
gLogger.debug( "RequestFinalization.__performCallback: Attempting to perform call back for %s with %s status" % ( sourceTask, status ) )
client = RPCClient( service )
gLogger.debug( "RequestFinalization.__performCallback: Created RPCClient to %s" % service )
execString = "res = client.%s('%s','%s')" % ( method, sourceTask, status )
fcn = None
if hasattr( client, method ) and callable( getattr( client, method ) ):
fcn = getattr( client, method )
if not fcn:
return S_ERROR( "Unable to invoke %s, it isn't a member funtion of %s" % ( method, service ) )
gLogger.debug( "RequestFinalization.__performCallback: Attempting to invoke %s service method" % method )
exec( execString )
res = fcn( sourceTask, status )
if not res['OK']:
gLogger.error( "RequestFinalization.__performCallback: Failed to perform callback", res['Message'] )
else:
Expand Down
Loading