diff --git a/DataManagementSystem/Client/ReplicaManager.py b/DataManagementSystem/Client/ReplicaManager.py index 114e5f4ba13..adc799dc621 100644 --- a/DataManagementSystem/Client/ReplicaManager.py +++ b/DataManagementSystem/Client/ReplicaManager.py @@ -1,901 +1,1037 @@ -""" This is the Replica Manager which links the functionalities of StorageElement and FileCatalog. """ +#################################################################################### +# $HeadURL$ +#################################################################################### + +""" :mod: ReplicaManager + ======================= + + .. module: ReplicaManager + :synopsis: ReplicaManager links the functionalities of StorageElement and FileCatalog. + + This module consists ReplicaManager and related classes. + +""" __RCSID__ = "$Id$" -import re, time, commands, random, os, fnmatch -import types +## imports +import commands from datetime import datetime, timedelta +import fnmatch +import os +import random +import re +import time +from types import StringTypes, ListType, DictType, StringType, TupleType +## from DIRAC import DIRAC - -from DIRAC import S_OK, S_ERROR, gLogger, gConfig +from DIRAC import S_OK, S_ERROR, gLogger, gConfig from DIRAC.AccountingSystem.Client.Types.DataOperation import DataOperation -from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient -from DIRAC.ResourceStatusSystem.Client import ResourceStatus -from DIRAC.Core.Utilities.File import makeGuid, getSize -from DIRAC.Core.Utilities.Adler import fileAdler, compareAdler -from DIRAC.Core.Utilities.List import sortList, randomize -from DIRAC.Core.Utilities.SiteSEMapping import getSEsForSite, isSameSiteSE, getSEsForCountry -from DIRAC.Resources.Storage.StorageElement import StorageElement -from DIRAC.Resources.Catalog.FileCatalog import FileCatalog +from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient +from DIRAC.ResourceStatusSystem.Client import ResourceStatus +from DIRAC.Core.Utilities.File import makeGuid, getSize +from DIRAC.Core.Utilities.Adler import fileAdler, compareAdler +from DIRAC.Core.Utilities.List import sortList, randomize +from DIRAC.Core.Utilities.SiteSEMapping import getSEsForSite, isSameSiteSE, getSEsForCountry +from DIRAC.Resources.Storage.StorageElement import StorageElement +from DIRAC.Resources.Catalog.FileCatalog import FileCatalog class CatalogBase: - - def __init__( self ): - """ This class stores the two wrapper functions for interacting with the FileCatalog: - - _executeFileCatalogFunction(lfn,method,argsDict={},catalogs=[]) - Is a wrapper around the available FileCatlog() functions. - The 'lfn' and 'method' arguments must be provided: - 'lfn' contains a single file string or a list or dictionary containing the required files. - 'method' is the name of the FileCatalog() to be invoked. - 'argsDict' contains aditional arguments that are requred for the method. - 'catalogs' is the list of catalogs the operation is to be performed on. - By default this is all available catalogs. - Examples are 'LcgFileCatalogCombined', 'BookkeepingDB', 'ProductionDB' - - _executeSingleFileCatalogFunction(lfn,method,argsDict={},catalogs=[]) - Is a wrapper around _executeFileCatalogFunction(). - It parses the output of _executeFileCatalogFunction() for the first file provided as input. - If this file is found in: - res['Value']['Successful'] an S_OK() is returned with the value. - res['Value']['Failed'] an S_ERROR() is returned with the error message. - """ - pass - - def _executeSingleFileCatalogFunction( self, lfn, method, argsDict = {}, catalogs = [] ): - res = self._executeFileCatalogFunction( lfn, method, argsDict, catalogs = catalogs ) - if type( lfn ) == types.ListType: - singleLfn = lfn[0] - elif type( lfn ) == types.DictType: - singleLfn = lfn.keys()[0] - else: - singleLfn = lfn - if not res['OK']: + """ + .. class:: CatalogBase + + This class stores the two wrapper functions for interacting with the FileCatalog. + """ + def _callFileCatalogFcnSingleFile( self, lfn, method, argsDict=None, catalogs=None ): + """ A wrapper around :CatalogBase._callFileCatalogFcn: for a single file. It parses + the output of :CatalogBase_callFileCatalogFcn: for the first file provided as input. + If this file is found in:: + + * res['Value']['Successful'] an S_OK() is returned with the value. + * res['Value']['Failed'] an S_ERROR() is returned with the error message. + + :warning: this function is executed only for the first LFN provided, in case of dict of LFNs + the order of keys are NOT preserved, so the output is undefined + + :param self: self reference + :param mixed lfn: LFN as string or list with LFNs or dict with LFNs as keys + :param str method: :FileCatalog: method name + :param dict argsDict: kwargs for method + :param list catalogs: list with catalog names + """ + ## default values + argsDict = argsDict if argsDict else dict() + catalogs = catalogs if catalogs else list() + ## checjk type + if not lfn or type(lfn) not in ( ListType, StringTypes, DictType ): + return S_ERROR( "wrong type (%s) for argument 'lfn'" % type(lfn) ) + singleLfn = lfn + if type( lfn ) == ListType: + singleLfn = lfn[0] + elif type( lfn ) == DictType: + singleLfn = lfn.keys()[0] + ## call only for single lfn + res = self._callFileCatalogFcn( singleLfn, method, argsDict, catalogs = catalogs ) + if not res["OK"]: return res - elif res['Value']['Failed'].has_key( singleLfn ): - errorMessage = res['Value']['Failed'][singleLfn] - return S_ERROR( errorMessage ) - else: - return S_OK( res['Value']['Successful'][singleLfn] ) + elif singleLfn in res["Value"]["Failed"]: + return S_ERROR( res["Value"]["Failed"][singleLfn] ) + return S_OK( res["Value"]["Successful"][singleLfn] ) - def _executeFileCatalogFunction( self, lfn, method, argsDict = {}, catalogs = [] ): + def _callFileCatalogFcn( self, lfn, method, argsDict = None, catalogs = None ): """ A simple wrapper around the file catalog functionality - """ - # First check the supplied lfn(s) are the correct format. - if type( lfn ) in types.StringTypes: - lfns = {lfn:False} - elif type( lfn ) == types.ListType: - lfns = {} - for lfn in lfn: - lfns[lfn] = False - elif type( lfn ) == types.DictType: - lfns = lfn.copy() - else: - errStr = "ReplicaManager._executeFileCatalogFunction: Supplied lfns must be string or list of strings or a dictionary." + + This is a wrapper around the available :FileCatalog: functions. + The :lfn: and :method: arguments must be provided. + + :param self: self reference + :param mixed lfn: a single LFN string or a list of LFNs or dictionary with LFNs stored as keys. + :param str method: name of the FileCatalog function to be invoked + :param dict argsDict: aditional keyword arguments that are requred for the :method: + :param list catalogs: list of catalogs the operation is to be performed on, by default this + is all available catalogs; examples are 'LcgFileCatalogCombined', 'BookkeepingDB', + 'ProductionDB' etc. + """ + ## default values + argsDict = argsDict if argsDict else dict() + catalogs = catalogs if catalogs else list() + lfns = None + if not lfn or type(lfn) not in ( StringTypes, ListType, DictType ): + errStr = "ReplicaManager._callFileCatalogFcn: Wrong 'lfn' argument." gLogger.error( errStr ) return S_ERROR( errStr ) - # Check we have some lfns + elif type( lfn ) in StringTypes: + lfns = { lfn : False } + elif type( lfn ) == ListType: + lfns = dict.fromkeys( lfn, False ) + elif type( lfn ) == DictType: + lfns = lfn.copy() + ## lfns supplied? if not lfns: - errMessage = "ReplicaManager._executeFileCatalogFunction: No lfns supplied." - gLogger.error( errMessage ) - return S_ERROR( errMessage ) - gLogger.debug( "ReplicaManager._executeFileCatalogFunction: Attempting to perform '%s' operation with %s lfns." % ( method, len( lfns ) ) ) - # Check we can instantiate the file catalog correctly - fileCatalog = FileCatalog( catalogs ) - # Generate the execution string - if argsDict: - execString = "res = fileCatalog.%s(lfns" % method - for argument, value in argsDict.items(): - if type( value ) == types.StringType: - execString = "%s, %s='%s'" % ( execString, argument, value ) - else: - execString = "%s, %s=%s" % ( execString, argument, value ) - execString = "%s)" % execString - else: - execString = "res = fileCatalog.%s(lfns)" % method - # Execute the execute string - try: - exec( execString ) - except AttributeError, errMessage: - exceptStr = "ReplicaManager._executeFileCatalogFunction: Exception while perfoming %s." % method - gLogger.exception( exceptStr, str( errMessage ) ) - return S_ERROR( exceptStr ) - # Return the output - if not res['OK']: - errStr = "ReplicaManager._executeFileCatalogFunction: Completely failed to perform %s." % method - gLogger.error( errStr, res['Message'] ) + errMsg = "ReplicaManager._callFileCatalogFcn: No lfns supplied." + gLogger.error( errMsg ) + return S_ERROR( errMsg ) + gLogger.debug( "ReplicaManager._callFileCatalogFcn: Will execute '%s' method with %s lfns." % ( method, + len(lfns) ) ) + ## create FileCatalog instance + fileCatalog = FileCatalog( catalogs=catalogs ) + ## get symbol + fcFcn = getattr( fileCatalog, method ) if hasattr( fileCatalog, method ) else None + ## check if it is callable + fcFcn = fcFcn if callable(fcFcn) else None + if not fcFcn: + errMsg = "ReplicaManager._callFileCatalogFcn: '%s' isn't a member function in FileCatalog." % method + gLogger.error( errMsg ) + return S_ERROR( errMsg ) + ## call it at least + res = fcFcn( lfns, **argsDict ) + if not res["OK"]: + gLogger.error( "ReplicaManager._callFileCatalogFcn: Failed to execute '%s'." % method, res["Message"] ) return res + def _fcFuncWrapper( self, singleFile=False ): + """ choose wrapper to call + + :param self: self reference + :param bool singleFile: flag to choose wrapper function, default :False: will + execute :FileCatalog._callFileCatalogFcn: + """ + return { True: self._callFileCatalogFcnSingleFile, + False: self._callFileCatalogFcn }[singleFile] + class CatalogFile( CatalogBase ): + """ + .. class:: CatalogFile - def getCatalogExists( self, lfn, singleFile = False, catalogs = [] ): - """ Determine whether the path is registered in the FileCatalog + Wrappers for various :FileCatalog: methods concering operations on files. + """ + def getCatalogExists( self, lfn, singleFile=False, catalogs=None ): + """ determine whether the path is registered in the :FileCatalog: by calling + :FileCatalog.exists: method. - 'lfn' is the files to check (can be a single file or list of lfns) + :param self: self reference + :param mixed lfn: LFN as string or list of LFN strings or dict with LFNs as keys + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'exists', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'exists', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "exists", catalogs=catalogs ) - def getCatalogIsFile( self, lfn, singleFile = False, catalogs = [] ): - """ Determine whether the path is registered as a file in the FileCatalog + def getCatalogIsFile( self, lfn, singleFile=False, catalogs=None ): + """ determine whether the path is registered as a file in the :FileCatalog: - 'lfn' is the files to check (can be a single file or list of lfns) + :param self: self reference + :param mixed lfn: LFN as string or list of LFN strings or dict with LFNs as keys + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'isFile', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'isFile', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "isFile", catalogs=catalogs ) - def getCatalogFileMetadata( self, lfn, singleFile = False, catalogs = [] ): - """ Get the metadata associated to a file in the FileCatalog + def getCatalogFileMetadata( self, lfn, singleFile=False, catalogs=None ): + """ get the metadata associated to the LFN in the :FileCatalog: - 'lfn' is the files to check (can be a single file or list of lfns) + :param self: self reference + :param mixed lfn: LFN as string or list of LFN strings or dict with LFNs as keys + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'getFileMetadata', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'getFileMetadata', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "getFileMetadata", catalogs=catalogs ) - def getCatalogFileSize( self, lfn, singleFile = False, catalogs = [] ): - """ Get the size registered for files in the FileCatalog + def getCatalogFileSize( self, lfn, singleFile=False, catalogs=None ): + """ get the size registered for files in the FileCatalog - 'lfn' is the files to check (can be a single lfn or list of lfns) + :param self: self reference + :param mixed lfn: LFN as string or list of LFN strings or dict with LFNs as keys + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'getFileSize', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'getFileSize', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "getFileSize", catalogs=catalogs ) - def getCatalogReplicas( self, lfn, allStatus = False, singleFile = False, catalogs = [] ): + def getCatalogReplicas( self, lfn, allStatus=False, singleFile=False, catalogs=None ): """ Get the replicas registered for files in the FileCatalog - 'lfn' is the files to check (can be a single lfn or list of lfns) + :param self: self reference + :param mixed lfn: LFN as string or list of LFN strings or dict with LFNs as keys + :param bool allStatus: ??? + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'getReplicas', argsDict = {'allStatus':allStatus}, catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'getReplicas', argsDict = {'allStatus':allStatus}, catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "getReplicas", argsDict={ "allStatus" : allStatus }, + catalogs=catalogs ) - def getCatalogLFNForPFN( self, pfn, singleFile = False, catalogs = [] ): - """ Get the LFNs registered with the supplied PFNs from the FileCatalog + def getCatalogLFNForPFN( self, pfn, singleFile=False, catalogs=None ): + """ get the LFNs registered with the supplied PFNs from the FileCatalog - 'pfn' is the files to obtain (can be a single pfn or list of pfns) + :param self: self reference + :param mixed pfn: the files to obtain (can be a single PFN or list of PFNs) + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( pfn, 'getLFNForPFN', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( pfn, 'getLFNForPFN', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( pfn, 'getLFNForPFN', catalogs=catalogs ) - def addCatalogFile( self, lfn, singleFile = False, catalogs = [] ): + def addCatalogFile( self, lfn, singleFile=False, catalogs=None ): """ Add a new file to the FileCatalog - 'lfn' is the dictionary containing the file properties + :param self: self reference + :param mixed lfn: LFN as string or list of LFN strings or dict with LFNs as keys + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'addFile', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'addFile', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "addFile", catalogs=catalogs ) - def removeCatalogFile( self, lfn, singleFile = False, catalogs = [] ): - """ Remove a file from the FileCatalog + def removeCatalogFile( self, lfn, singleFile=False, catalogs=None ): + """ remove a file from the FileCatalog - 'lfn' is the file to be removed + :param self: self reference + :param mixed lfn: LFN as string or list of LFN strings or dict with LFNs as keys + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'removeFile', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'removeFile', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "removeFile", catalogs=catalogs ) class CatalogReplica( CatalogBase ): + """ + .. class:: CatalogReplica - def getCatalogReplicaStatus( self, lfn, singleFile = False, catalogs = [] ): - """ Get the status of the replica as registered in the FileCatalog + Wrappers for various :FileCatalog: methods concering operations on replicas. + """ + def getCatalogReplicaStatus( self, lfn, singleFile=False, catalogs=None ): + """ get the status of the replica as registered in the :FileCatalog: - 'lfn' is a dictionary containing {LFN:SE} + :param self: self reference + :param dict lfn: dict containing { LFN : SE } + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'getReplicaStatus', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'getReplicaStatus', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "getReplicaStatus", catalogs=catalogs ) - def addCatalogReplica( self, lfn, singleFile = False, catalogs = [] ): - """ Add a new replica to the FileCatalog + def addCatalogReplica( self, lfn, singleFile=False, catalogs=None ): + """ add a new replica to the :FileCatalog: - 'lfn' is the dictionary containing the replica properties + :param self: self reference + :param dict lfn: dictionary containing the replica properties + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'addReplica', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'addReplica', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "addReplica", catalogs=catalogs ) - def removeCatalogReplica( self, lfn, singleFile = False, catalogs = [] ): - """ Remove a replica from the FileCatalog + def removeCatalogReplica( self, lfn, singleFile=False, catalogs=None ): + """ remove a replica from the :FileCatalog: - 'lfn' is the file to be removed + :param self: self reference + :param mixed lfn: lfn to be removed + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'removeReplica', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'removeReplica', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "removeReplica", catalogs=catalogs ) - def setCatalogReplicaStatus( self, lfn, singleFile = False, catalogs = [] ): - """ Change the status for a replica in the FileCatalog + def setCatalogReplicaStatus( self, lfn, singleFile=False, catalogs=None ): + """ Change the status for a replica in the :FileCatalog: - 'lfn' is the replica information to change + :param self: self reference + :param mixed lfn: dict with replica information to change + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'setReplicaStatus', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'setReplicaStatus', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "setReplicaStatus", catalogs=catalogs ) - def setCatalogReplicaHost( self, lfn, singleFile = False, catalogs = [] ): - """ Change the registered SE for a replica in the FileCatalog + def setCatalogReplicaHost( self, lfn, singleFile=False, catalogs=None ): + """ change the registered SE for a replica in the :FileCatalog: - 'lfn' is the replica information to change + :param self: self reference + :param mixed lfn: dict with replica information to change + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'setReplicaHost', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'setReplicaHost', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "setReplicaHost", catalogs=catalogs ) class CatalogDirectory( CatalogBase ): + """ + .. class:: CatalogDirectory - def getCatalogIsDirectory( self, lfn, singleFile = False, catalogs = [] ): - """ Determine whether the path is registered as a directory in the FileCatalog + Wrappers for various :FileCatalog: methods concering operations on folders. + """ + def getCatalogIsDirectory( self, lfn, singleFile=False, catalogs=None ): + """ determine whether the path is registered as a directory in the :FileCatalog: - 'lfn' is the files to check (can be a single file or list of lfns) + :param self: self reference + :param mixed lfn: files to check (can be a single file or list of lfns) + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'isDirectory', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'isDirectory', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "isDirectory", catalogs=catalogs ) - def getCatalogDirectoryMetadata( self, lfn, singleFile = False, catalogs = [] ): - """ Get the metadata associated to a directory in the FileCatalog + def getCatalogDirectoryMetadata( self, lfn, singleFile=False, catalogs=None ): + """ get the metadata associated to a directory in the :FileCatalog: - 'lfn' is the directories to check (can be a single directory or list of directories) + :param self: self reference + :param mixed lfn: folders to check (can be a single directory or list of directories) + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'getDirectoryMetadata', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'getDirectoryMetadata', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "getDirectoryMetadata", catalogs=catalogs ) - def getCatalogDirectoryReplicas( self, lfn, singleFile = False, catalogs = [] ): - """ Get the replicas for the contents of a directory in the FileCatalog + def getCatalogDirectoryReplicas( self, lfn, singleFile=False, catalogs=None ): + """ get the replicas for the contents of a directory in the FileCatalog - 'lfn' is the directories to check (can be a single directory or list of directories) + :param self: self reference + :param mixed lfn: folders to check (can be a single directory or list of directories) + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'getDirectoryReplicas', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'getDirectoryReplicas', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "getDirectoryReplicas", catalogs=catalogs ) - def getCatalogListDirectory( self, lfn, verbose = False, singleFile = False, catalogs = [] ): - """ Get the contents of a directory in the FileCatalog + def getCatalogListDirectory( self, lfn, verbose=False, singleFile=False, catalogs=None ): + """ get the contents of a directory in the :FileCatalog: - 'lfn' is the directories to check (can be a single directory or list of directories) + :param self: self reference + :param mixed lfn: folders to check (can be a single directory or list of directories) + :param bool verbose: shout + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'listDirectory', argsDict = {'verbose':verbose}, catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'listDirectory', argsDict = {'verbose':verbose}, catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "listDirectory", argsDict={"verbose": verbose}, + catalogs=catalogs ) - def getCatalogDirectorySize( self, lfn, singleFile = False, catalogs = [] ): - """ Get the size a directory in the FileCatalog + def getCatalogDirectorySize( self, lfn, singleFile=False, catalogs=None ): + """ get the size a directory in the :FileCatalog: - 'lfn' is the directories to check (can be a single directory or list of directories) + :param self: self reference + :param mixed lfn: folders to check (can be a single directory or list of directories) + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'getDirectorySize', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'getDirectorySize', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "getDirectorySize", catalogs=catalogs ) - def createCatalogDirectory( self, lfn, singleFile = False, catalogs = [] ): - """ Create the directory supplied in the FileCatalog + def createCatalogDirectory( self, lfn, singleFile=False, catalogs=None ): + """ mkdir in the :FileCatalog: - 'lfn' is the directory to create + :param self: self reference + :param mixed lfn: the directory to create + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'createDirectory', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'createDirectory', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "createDirectory", catalogs=catalogs ) - def removeCatalogDirectory( self, lfn, recursive = False, singleFile = False, catalogs = [] ): - """ Remove the directory supplied from the FileCatalog + def removeCatalogDirectory( self, lfn, recursive=False, singleFile=False, catalogs=None ): + """ rmdir from the :FileCatalog: - 'lfn' is the directory to remove + :param self: self reference + :param mixed lfn: the directory to remove + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'removeDirectory', argsDict = {'recursive':recursive}, catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'removeDirectory', argsDict = {'recursive':recursive}, catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "removeDirectory", argsDict={"recursive" : recursive}, + catalogs=catalogs ) class CatalogLink( CatalogBase ): + """ + .. class:: CatalogReplica - def getCatalogIsLink( self, lfn, singleFile = False, catalogs = [] ): - """ Determine whether the path is registered as a link in the FileCatalog + Wrappers for various :FileCatalog: methods concering operations on links. + """ + def getCatalogIsLink( self, lfn, singleFile=False, catalogs=None ): + """ determine whether the path is registered as a link in the :FileCatalog: - 'lfn' is the paths to check (can be a single path or list of paths) + :param self: self reference + :param mixed lfn: path to be checked (string of list of strings) + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'isLink', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'isLink', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "isLink", catalogs=catalogs ) - def getCatalogReadLink( self, lfn, singleFile = False, catalogs = [] ): - """ Get the target of a link as registered in the FileCatalog + def getCatalogReadLink( self, lfn, singleFile=False, catalogs=None ): + """ get the target of a link as registered in the :FileCatalog: - 'lfn' is the links to check (can be a single link or list of links) + :param self: self reference + :param mixed lfn: path to be checked (string of list of strings) + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'readLink', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'readLink', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "readLink", catalogs=catalogs ) - def createCatalogLink( self, lfn, singleFile = False, catalogs = [] ): - """ Create the link supplied in the FileCatalog + def createCatalogLink( self, lfn, singleFile=False, catalogs=None ): + """ ln in the :FileCatalog: (create the link) - 'lfn' is the link dictionary containing the target lfn and link name to create + :param self: self reference + :param mixed lfn: link dictionary containing the target lfn and link name to create + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'createLink', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'createLink', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + return self._fcFuncWrapper(singleFile)( lfn, "createLink", catalogs=catalogs ) - def removeCatalogLink( self, lfn, singleFile = False, catalogs = [] ): - """ Remove the link supplied from the FileCatalog + def removeCatalogLink( self, lfn, singleFile=False, catalogs=None ): + """ rm the link supplied from the :FileCatalog: - 'lfn' is the link to remove + :param self: self reference + :param mixed lfn: link to be removed (string of list of strings) + :param bool singleFile: execute for the first LFN only + :param list catalogs: catalogs' names """ - if singleFile: - return self._executeSingleFileCatalogFunction( lfn, 'removeLink', catalogs = catalogs ) - else: - return self._executeFileCatalogFunction( lfn, 'removeLink', catalogs = catalogs ) + catalogs = catalogs if catalogs else list() + self._fcFuncWrapper(singleFile)( lfn, "removeLink", catalogs=catalogs ) class CatalogInterface( CatalogFile, CatalogReplica, CatalogDirectory, CatalogLink ): - """ Dummy class to expose all the methods of the CatalogInterface + """ + .. class:: CatalogInterface + + Dummy class to expose all the methods of the CatalogInterface """ pass class StorageBase: - - def __init__( self ): - """ This class stores the two wrapper functions for interacting with the StorageElement: - - _executeStorageElementFunction(storageElementName,pfn,method,argsDict={}) - Is a wrapper around the available StorageElement() functions. - The 'storageElementName', 'pfn' and 'method' arguments must be provided: - 'storageElementName' is the DIRAC SE name to be accessed e.g. CERN-DST. - 'pfn' contains a single pfn string or a list or dictionary containing the required files. - 'method' is the name of the StorageElement() method to be invoked. - 'argsDict' contains additional arguments that are required for the method. - - _executeSingleStorageElementFunction(storageElementName,pfn,method,argsDict={}) - Is a wrapper around _executeStorageElementFunction(). - It parses the output of _executeStorageElementFunction() for the first pfn provided as input. - If this pfn is found in: - res['Value']['Successful'] an S_OK() is returned with the value. - res['Value']['Failed'] an S_ERROR() is returned with the error message. - """ - pass - - def _executeSingleStorageElementFunction( self, storageElementName, pfn, method, argsDict = {} ): - res = self._executeStorageElementFunction( storageElementName, pfn, method, argsDict ) - if type( pfn ) == types.ListType: + """ + .. class:: StorageBase + + This class stores the two wrapper functions for interacting with the StorageElement. + """ + def _callStorageElementFcnSingleFile( self, storageElementName, pfn, method, argsDict=None ): + """ wrapper around :StorageBase._callStorageElementFcn: for single file execution + + It parses the output of :StorageBase._callStorageElementFcn: for the first pfn provided as input. + If this pfn is found in:: + + * res['Value']['Successful'] an S_OK() is returned with the value. + * res['Value']['Failed'] an S_ERROR() is returned with the error message. + + :param self: self reference + :param str storageElementName: DIRAC SE name to be accessed e.g. CERN-DST + :param mixed pfn: contains a single PFN string or a list of PFNs or dictionary containing PFNs + :param str method: name of the :StorageElement: method to be invoked + :param dict argsDict: additional keyword arguments that are required for the :method: + """ + argsDict = argsDict if argsDict else dict() + ## check type + if type( pfn ) == ListType: pfn = pfn[0] - elif type( pfn ) == types.DictType: + elif type( pfn ) == DictType: pfn = pfn.keys()[0] - if not res['OK']: + ## call wrapper + res = self._callStorageElementFcn( storageElementName, str(pfn), method, argsDict ) + ## check results + if not res["OK"]: return res - elif res['Value']['Failed'].has_key( pfn ): - errorMessage = res['Value']['Failed'][pfn] + elif pfn in res["Value"]["Failed"]: + errorMessage = res["Value"]["Failed"][pfn] return S_ERROR( errorMessage ) else: - return S_OK( res['Value']['Successful'][pfn] ) - - def _executeStorageElementFunction( self, storageElementName, pfn, method, argsDict = {} ): - """ A simple wrapper around the storage element functionality - """ - # First check the supplied pfn(s) are the correct format. - if type( pfn ) in types.StringTypes: - pfns = {pfn:False} - elif type( pfn ) == types.ListType: - pfns = {} - for url in pfn: - pfns[url] = False - elif type( pfn ) == types.DictType: + return S_OK( res["Value"]["Successful"][pfn] ) + + def _callStorageElementFcn( self, storageElementName, pfn, method, argsDict=None ): + """ a simple wrapper around the :StorageElement: functionality + + :param self: self reference + :param str storageElementName: DIRAC SE name to be accessed e.g. CERN-DST + :param mixed pfn: contains a single PFN string or a list of PFNs or dictionary containing PFNs + :param str method: name of the :StorageElement: method to be invoked + :param dict argsDict: additional keyword arguments that are required for the :method: + """ + argsDict = argsDict if argsDict else {} + ## check pfn type + if type( pfn ) in StringTypes: + pfns = {pfn : False} + elif type( pfn ) == ListType: + pfns = dict.fromkeys( pfn, False ) + elif type( pfn ) == DictType: pfns = pfn.copy() else: - errStr = "ReplicaManager._executeStorageElementFunction: Supplied pfns must be string or list of strings or a dictionary." + errStr = "ReplicaManager._callStorageElementFcn: Supplied pfns must be a str, list of str or dict." gLogger.error( errStr ) return S_ERROR( errStr ) - # Check we have some pfns + ## have we got some pfns? if not pfns: - errMessage = "ReplicaManager._executeStorageElementFunction: No pfns supplied." + errMessage = "ReplicaManager._callStorageElementFcn: No pfns supplied." gLogger.error( errMessage ) return S_ERROR( errMessage ) - gLogger.debug( "ReplicaManager._executeStorageElementFunction: Attempting to perform '%s' operation with %s pfns." % ( method, len( pfns ) ) ) - # Check we can instantiate the storage element correctly + gLogger.debug( "ReplicaManager._callStorageElementFcn: Will execute '%s' with %s pfns." % ( method, len( pfns ) ) ) + ## make sure StorageElement is valid overwride = False - if method in ['removeFile', 'removeDirectory']: + if method in [ "removeFile", "removeDirectory"]: overwride = True - storageElement = StorageElement( storageElementName, overwride = overwride ) + storageElement = StorageElement( storageElementName, overwride=overwride ) res = storageElement.isValid( method ) if not res['OK']: - errStr = "ReplicaManager._executeStorageElementFunction: Failed to instantiate Storage Element" + errStr = "ReplicaManager._callStorageElementFcn: Failed to instantiate Storage Element" gLogger.error( errStr, "for performing %s at %s." % ( method, storageElementName ) ) return res - # Generate the execution string - if argsDict: - execString = "res = storageElement.%s(pfns" % method - for argument, value in argsDict.items(): - if type( value ) == types.StringType: - execString = "%s, %s='%s'" % ( execString, argument, value ) - else: - execString = "%s, %s=%s" % ( execString, argument, value ) - execString = "%s)" % execString - else: - execString = "res = storageElement.%s(pfns)" % method - # Execute the execute string - try: - exec( execString ) - except AttributeError, errMessage: - exceptStr = "ReplicaManager._executeStorageElementFunction: Exception while perfoming %s." % method - gLogger.exception( exceptStr, str( errMessage ) ) - return S_ERROR( exceptStr ) - # Return the output - if not res['OK']: - errStr = "ReplicaManager._executeStorageElementFunction: Completely failed to perform %s." % method - gLogger.error( errStr, '%s : %s' % ( storageElementName, res['Message'] ) ) + ## get sybmbol + fcFcn = getattr( storageElement, method ) if hasattr( storageElement, method ) else None + ## make sure it is callable + fcFcn = fcFcn if callable(fcFcn) else None + if not fcFcn: + errMsg = "ReplicaManager._callStorageElementFcn: '%s' isn't a member function in StorageElement." % method + gLogger.error( errMsg ) + return S_ERROR( errMsg ) + ## call it at least + res = fcFcn( pfns, **argsDict ) + ## return the output + if not res["OK"]: + errStr = "ReplicaManager._callStorageElementFcn: Completely failed to perform %s." % method + gLogger.error( errStr, '%s : %s' % ( storageElementName, res["Message"] ) ) return res + def _seFuncWrapper( self, singleFile=False ): + """ choose wrapper to call + + :param self: self reference + :param bool singleFile: flag to choose wrapper function, default :False: will + execute :StorageBase._callStorageElementFcn: + """ + return { True: self._callStorageElementFcnSingleFile, + False: self._callStorageElementFcn }[singleFile] + def getPfnForLfn( self, lfns, storageElementName ): + """ get PFNs for supplied LFNs at :storageElementName: SE + + :param self: self reference + :param list lfns: list of LFNs + :param str stotrageElementName: DIRAC SE name + """ storageElement = StorageElement( storageElementName ) - res = storageElement.isValid( 'getPfnForLfn' ) + res = storageElement.isValid( "getPfnForLfn" ) if not res['OK']: - errStr = "ReplicaManager.getPfnForLfn: Failed to instantiate Storage Element" - gLogger.error( errStr, "for performing getPfnForLfn at %s." % ( storageElementName ) ) + gLogger.error( "ReplicaManager.getPfnForLfn: Failed to instantiate StorageElement at %s" % storageElementName ) return res - successful = {} - failed = {} + retDict = { "Successful" : {}, "Failed" : {} } for lfn in lfns: res = storageElement.getPfnForLfn( lfn ) - if res['OK']: - successful[lfn] = res['Value'] + if res["OK"]: + retDict["Successful"][lfn] = res["Value"] else: - failed[lfn] = res['Message'] - resDict = {'Successful':successful, 'Failed':failed} - return S_OK( resDict ) + retDict["Failed"][lfn] = res["Message"] + return S_OK( retDict ) def getLfnForPfn( self, pfns, storageElementName ): + """ get LFNs for supplied PFNs at :storageElementName: SE + + :param self: self reference + :param list lfns: list of LFNs + :param str stotrageElementName: DIRAC SE name + """ storageElement = StorageElement( storageElementName ) - res = storageElement.isValid( 'getPfnPath' ) + res = storageElement.isValid( "getPfnPath" ) if not res['OK']: - errStr = "ReplicaManager.getLfnForPfn: Failed to instantiate Storage Element" - gLogger.error( errStr, "for performing getPfnPath at %s." % ( storageElementName ) ) + gLogger.error( "ReplicaManager.getLfnForPfn: Failed to instantiate StorageElement at %s" % storageElementName ) return res - successful = {} - failed = {} + retDict = { "Successful" : {}, "Failed" : {} } for pfn in pfns: res = storageElement.getPfnPath( pfn ) - if res['OK']: - successful[pfn] = res['Value'] + if res["OK"]: + retDict["Successful"][pfn] = res["Value"] else: - failed[pfn] = res['Message'] - resDict = {'Successful':successful, 'Failed':failed} - return S_OK( resDict ) + retDict["Failed"][pfn] = res["Message"] + return S_OK( retDict ) + + def getPfnForProtocol( self, pfns, storageElementName, protocol="SRM2", withPort=True ): + """ create PFNs strings at :storageElementName: SE using protocol :protocol: - def getPfnForProtocol( self, pfns, storageElementName, protocol = 'SRM2', withPort = True ): + :param self: self reference + :param list pfns: list of PFNs + :param str storageElementName: DIRAC SE name + :param str protocol: protocol name (default: 'SRM2') + :param bool withPort: flag to include port in PFN (default: True) + """ storageElement = StorageElement( storageElementName ) - res = storageElement.isValid( 'getPfnForProtocol' ) - if not res['OK']: - errStr = "ReplicaManager.getPfnForLfn: Failed to instantiate Storage Element" - gLogger.error( errStr, "for performing getPfnForProtocol at %s." % ( storageElementName ) ) + res = storageElement.isValid( "getPfnForProtocol" ) + if not res["OK"]: + gLogger.error("ReplicaManager.getPfnForProtocol: Failed to instantiate StorageElement at %s" % storageElementName) return res - successful = {} - failed = {} + retDict = { "Successful" : {}, "Failed" : {}} for pfn in pfns: - res = storageElement.getPfnForProtocol( pfn, protocol, withPort = withPort ) - if res['OK']: - successful[pfn] = res['Value'] + res = storageElement.getPfnForProtocol( pfn, protocol, withPort=withPort ) + if res["OK"]: + retDict["Successful"][pfn] = res["Value"] else: - failed[pfn] = res['Message'] - resDict = {'Successful':successful, 'Failed':failed} - return S_OK( resDict ) + retDict["Failed"][pfn] = res["Message"] + return S_OK( retDict ) class StorageFile( StorageBase ): + """ + .. class:: StorageFile - ########################################################################## - # - # These are the storage element wrapper functions available for physical files - # - - def getStorageFileExists( self, physicalFile, storageElementName, singleFile = False ): - """ Determine the existance of the physical files + Wrappers for various :StorageElement: methods concering operations on files. + """ + def getStorageFileExists( self, physicalFile, storageElementName, singleFile=False ): + """ determine the existance of the physical files - 'physicalFile' is the pfn(s) to be checked - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed physicalFile: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'exists' ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'exists' ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, "exists" ) - def getStorageFileIsFile( self, physicalFile, storageElementName, singleFile = False ): - """ Determine the physical paths are files + def getStorageFileIsFile( self, physicalFile, storageElementName, singleFile=False ): + """ determine if supplied physical paths are files - 'physicalFile' is the pfn(s) to be checked - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed physicalFile: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'isFile' ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'isFile' ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, "isFile" ) - def getStorageFileSize( self, physicalFile, storageElementName, singleFile = False ): - """ Obtain the size of the physical files + def getStorageFileSize( self, physicalFile, storageElementName, singleFile=False ): + """ get the size of the physical files - 'physicalFile' is the pfn(s) size to be obtained - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed physicalFile: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'getFileSize' ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'getFileSize' ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, "getFileSize" ) - def getStorageFileAccessUrl( self, physicalFile, storageElementName, protocol = [], singleFile = False ): - """ Obtain the access url for a physical file + def getStorageFileAccessUrl( self, physicalFile, storageElementName, protocol=None, singleFile=False ): + """ get the access url for a physical file - 'physicalFile' is the pfn(s) to access - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed physicalFile: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'getAccessUrl', argsDict = {'protocol':protocol} ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'getAccessUrl', argsDict = {'protocol':protocol} ) + protocol = protocol if protocol else list() + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, + "getAccessUrl", argsDict={"protocol" : protocol} ) - def getStorageFileMetadata( self, physicalFile, storageElementName, singleFile = False ): - """ Obtain the metadata for physical files + def getStorageFileMetadata( self, physicalFile, storageElementName, singleFile=False ): + """ get the metadatas for physical files - 'physicalFile' is the pfn(s) to be checked - 'storageElementName' is the Storage Element to check + :param self: self reference + :param mixed physicalFile: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'getFileMetadata' ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'getFileMetadata' ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, "getFileMetadata" ) - def removeStorageFile( self, physicalFile, storageElementName, singleFile = False ): - """ Remove physical files + def removeStorageFile( self, physicalFile, storageElementName, singleFile=False ): + """ rm supplied physical files from :storageElementName: DIRAC SE - 'physicalFile' is the pfn(s) to be removed - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed physicalFile: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'removeFile' ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'removeFile' ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, "removeFile" ) - def prestageStorageFile( self, physicalFile, storageElementName, lifetime = 60 * 60 * 24, singleFile = False ): - """ Prestage physical files + def prestageStorageFile( self, physicalFile, storageElementName, lifetime=86400, singleFile=False ): + """ prestage physical files - 'physicalFile' is the pfn(s) to be pre-staged - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed physicalFile: PFNs to be prestaged + :param str storageElement: SE name + :param int lifetime: 24h in seconds + :param bool singleFile: flag to prestage only one file """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'prestageFile', {'lifetime':lifetime} ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'prestageFile', {'lifetime':lifetime} ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, + "prestageFile", argsDict={"lifetime" : lifetime} ) - def getPrestageStorageFileStatus( self, physicalFile, storageElementName, singleFile = False ): - """ Obtain the status of a pre-stage request + def getPrestageStorageFileStatus( self, physicalFile, storageElementName, singleFile=False ): + """ get the status of a pre-stage request - 'physicalFile' is the pfn(s) to obtain the status - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed physicalFile: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'prestageFileStatus' ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'prestageFileStatus' ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, "prestageFileStatus" ) - def pinStorageFile( self, physicalFile, storageElementName, lifetime = 60 * 60 * 24, singleFile = False ): - """ Pin physical files with a given lifetime + def pinStorageFile( self, physicalFile, storageElementName, lifetime=86400, singleFile=False ): + """ pin physical files with a given lifetime - 'physicalFile' is the pfn(s) to pin - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed physicalFile: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param int lifetime: 24h in seconds + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'pinFile', {'lifetime':lifetime} ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'pinFile', {'lifetime':lifetime} ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, + "pinFile", argsDict={"lifetime": lifetime} ) - def releaseStorageFile( self, physicalFile, storageElementName, singleFile = False ): - """ Release the pin on physical files + def releaseStorageFile( self, physicalFile, storageElementName, singleFile=False ): + """ release the pin on physical files - 'physicalFile' is the pfn(s) to release - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed physicalFile: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'releaseFile' ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'releaseFile' ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, "releaseFile" ) - def getStorageFile( self, physicalFile, storageElementName, localPath = False, singleFile = False ): - """ Get a local copy of a physical file + def getStorageFile( self, physicalFile, storageElementName, localPath=False, singleFile=False ): + """ create a local copy of a physical file - 'physicalFile' is the pfn(s) to get - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed physicalFile: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param mixed localPath: string with local paht to use or False (if False, os.getcwd() will be used) + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'getFile', argsDict = {'localPath':localPath} ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'getFile', argsDict = {'localPath':localPath} ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, + "getFile", argsDict={"localPath": localPath} ) - def putStorageFile( self, physicalFile, storageElementName, singleFile = False ): - """ Put a local file to the storage element + def putStorageFile( self, physicalFile, storageElementName, singleFile=False ): + """ put the local file to the storage element - 'physicalFile' is the pfn(s) dict to put - 'storageElementName' is the StorageElement + :param self: self reference + :param mixed physicalFile: dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'putFile' ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'putFile' ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, "putFile" ) def replicateStorageFile( self, physicalFile, size, storageElementName, singleFile = False ): - """ Replicate a physical file to a storage element + """ replicate a physical file to a storage element - 'physicalFile' is the pfn(s) dict to replicate - 'storageElementName' is the target StorageElement + :param self: self reference + :param mixed physicalFile: dictionary with PFN information + :param int size: size of PFN in bytes + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first PFN only """ - if singleFile: - return self._executeSingleStorageElementFunction( storageElementName, physicalFile, 'replicateFile', argsDict = {'sourceSize':size} ) - else: - return self._executeStorageElementFunction( storageElementName, physicalFile, 'replicateFile', argsDict = {'sourceSize':size} ) + return self._seFuncWrapper(singleFile)( storageElementName, physicalFile, + 'replicateFile', argsDict ={'sourceSize': size} ) class StorageDirectory( StorageBase ): + """ + .. class:: StorageDirectory - ########################################################################## - # - # These are the storage element wrapper functions available for directories - # - - def getStorageDirectoryIsDirectory( self, storageDirectory, storageElementName, singleDirectory = False ): - """ Determine the storage paths are directories + Wrappers for various :StorageElement: methods concering operations on folders. + """ + def getStorageDirectoryIsDirectory( self, storageDirectory, storageElementName, singleDirectory=False ): + """ determine if the storage paths are directories - 'storageDirectory' is the pfn(s) to be checked - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed storageDirectory: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleDirectory: execute for the first PFN only """ - if singleDirectory: - return self._executeSingleStorageElementFunction( storageElementName, storageDirectory, 'isDirectory' ) - else: - return self._executeStorageElementFunction( storageElementName, storageDirectory, 'isDirectory' ) + return self._seFuncWrapper(singleDirectory)( storageElementName, storageDirectory, "isDirectory" ) - def getStorageDirectoryMetadata( self, storageDirectory, storageElementName, singleDirectory = False ): - """ Obtain the metadata for storage directories + def getStorageDirectoryMetadata( self, storageDirectory, storageElementName, singleDirectory=False ): + """ get the metadata for storage directories - 'storageDirectory' is the pfn(s) to be checked - 'storageElementName' is the Storage Element to check + :param self: self reference + :param mixed storageDirectory: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleDirectory: execute for the first PFN only """ - if singleDirectory: - return self._executeSingleStorageElementFunction( storageElementName, storageDirectory, 'getDirectoryMetadata' ) - else: - return self._executeStorageElementFunction( storageElementName, storageDirectory, 'getDirectoryMetadata' ) + return self._seFuncWrapper(singleDirectory)( storageElementName, storageDirectory, "getDirectoryMetadata" ) - def getStorageDirectorySize( self, storageDirectory, storageElementName, singleDirectory = False ): - """ Obtain the size of the storage directories + def getStorageDirectorySize( self, storageDirectory, storageElementName, singleDirectory=False ): + """ get the size of the storage directories - 'storageDirectory' is the pfn(s) size to be obtained - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed storageDirectory: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleDirectory: execute for the first PFN only """ - if singleDirectory: - return self._executeSingleStorageElementFunction( storageElementName, storageDirectory, 'getDirectorySize' ) - else: - return self._executeStorageElementFunction( storageElementName, storageDirectory, 'getDirectorySize' ) + return self._seFuncWrapper(singleDirectory)( storageElementName, storageDirectory, "getDirectorySize" ) - def getStorageListDirectory( self, storageDirectory, storageElementName, singleDirectory = False ): - """ List the contents of a directory in the Storage Element + def getStorageListDirectory( self, storageDirectory, storageElementName, singleDirectory=False ): + """ ls of a directory in the Storage Element - 'storageDirectory' is the pfn(s) directory to be obtained - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed storageDirectory: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleDirectory: execute for the first PFN only """ - if singleDirectory: - return self._executeSingleStorageElementFunction( storageElementName, storageDirectory, 'listDirectory' ) - else: - return self._executeStorageElementFunction( storageElementName, storageDirectory, 'listDirectory' ) + return self._seFuncWrapper(singleDirectory)( storageElementName, storageDirectory, "listDirectory" ) - def getStorageDirectory( self, storageDirectory, storageElementName, localPath = False, singleDirectory = False ): - """ Get locally the contents of a directory from the Storage Element + def getStorageDirectory( self, storageDirectory, storageElementName, localPath=False, singleDirectory=False ): + """ copy the contents of a directory from the Storage Element to local folder - 'storageDirectory' is the pfn(s) directory to be obtained - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed storageDirectory: string with PFN or list with PFNs or dictionary with PFNs as keys + :param mixed localPath: destination folder, if False, so.getcwd() will be used + :param str storageElementName: DIRAC SE name + :param bool singleDirectory: execute for the first PFN only """ - if singleDirectory: - return self._executeSingleStorageElementFunction( storageElementName, storageDirectory, 'getDirectory', argsDict = {'localPath':localPath} ) - else: - return self._executeStorageElementFunction( storageElementName, storageDirectory, 'getDirectory', argsDict = {'localPath':localPath} ) + return self._seFuncWrapper(singleDirectory)( storageElementName, storageDirectory, + "getDirectory", argsDict={'localPath': localPath} ) - def putStorageDirectory( self, storageDirectory, storageElementName, singleDirectory = False ): - """ Put a local directory to the storage element + def putStorageDirectory( self, storageDirectory, storageElementName, singleDirectory=False ): + """ put the local directory to the storage element - 'storageDirectory' is the pfn(s) directory to be put - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed storageDirectory: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleDirectory: execute for the first PFN only """ - if singleDirectory: - return self._executeSingleStorageElementFunction( storageElementName, storageDirectory, 'putDirectory' ) - else: - return self._executeStorageElementFunction( storageElementName, storageDirectory, 'putDirectory' ) + return self._seFuncWrapper(singleDirectory)( storageElementName, storageDirectory, "putDirectory" ) - def removeStorageDirectory( self, storageDirectory, storageElementName, recursive = False, singleDirectory = False ): - """ Revove a directory from the storage element + def removeStorageDirectory( self, storageDirectory, storageElementName, recursive=False, singleDirectory=False ): + """ rmdir a directory from the storage element - 'storageDirectory' is the pfn(s) directory to be removed - 'storageElementName' is the Storage Element + :param self: self reference + :param mixed storageDirectory: string with PFN or list with PFNs or dictionary with PFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleDirectory: execute for the first PFN only """ - if singleDirectory: - return self._executeSingleStorageElementFunction( storageElementName, storageDirectory, 'removeDirectory', argsDict = {'recursive':recursive} ) - else: - return self._executeStorageElementFunction( storageElementName, storageDirectory, 'removeDirectory', argsDict = {'recursive':recursive} ) + return self._seFuncWrapper(singleDirectory)( storageElementName, storageDirectory, + "removeDirectory", argsDict={"recursive": recursive} ) class StorageInterface( StorageFile, StorageDirectory ): - """ Dummy class to expose all the methods of the StorageInterface + """ + .. class:: StorageInterface + + Dummy class to expose all the methods of the StorageInterface """ pass class CatalogToStorage( CatalogInterface, StorageInterface ): + """ + .. class:: CatalogToStorage - ########################################################################## - # - # These are the wrapper functions for doing simple replica->SE operations - # + Collection of functions doing simple replica<-->Storage element operations. + """ + def _replicaSEFcnWrapper( self, singleFile=False): + """ choose wrapper to call + + :param self: self reference + :param bool singleFile: flag to choose wrapper function, default :False: will + execute :CatalogToStorage._callReplicaSEFcn: + """ + return { True: self._callReplicaSEFcnSingleFile, + False: self._callReplicaSEFcn }[singleFile] + + def _callReplicaSEFcnSingleFile( self, storageElementName, lfn, method, argsDict=None ): + """ call :method: of StorageElement :storageElementName: for single :lfn: using :argsDict: kwargs + + :param self: self reference + :param str storageElementName: DIRAC SE name + :param mixed lfn: LFN + :param str method: StorageElement function name + :param dict argsDict: kwargs of :method: + """ + ## default value + argsDict = argsDict if argsDict else dict() + ## get single LFN + singleLfn = lfn + if type( lfn ) == ListType: + singleLfn = lfn[0] + elif type( lfn ) == DictType: + singleLfn = lfn.keys()[0] + ## call method + res = self._callReplicaSEFcn( storageElementName, singleLfn, method, argsDict ) + ## check results + if not res["OK"]: + return res + elif singleLfn in res["Value"]["Failed"]: + return S_ERROR( res["Value"]["Failed"][singleLfn] ) + return S_OK( res["Value"]["Successful"][singleLfn] ) + + def _callReplicaSEFcn( self, storageElementName, lfn, method, argsDict=None ): + """ a simple wrapper that allows replica querying then perform the StorageElement operation + + :param self: self reference + :param str storageElementName: DIRAC SE name + :param mixed lfn: a LFN str, list of LFNs or dict with LFNs as keys + """ + ## default value + argsDict = argsDict if argsDict else dict() + ## get replicas for lfn + res = self._callFileCatalogFcn( lfn, "getReplicas" ) + if not res["OK"]: + errStr = "ReplicaManager._callReplicaSEFcn: Completely failed to get replicas for LFNs." + gLogger.error( errStr, res["Message"] ) + return res + ## returned dict, get failed replicase + retDict = { "Failed": res["Value"]["Failed"], + "Successful" : {} } + ## print errors + for lfn, reason in retDict["Failed"].items(): + gLogger.error( "ReplicaManager._callReplicaSEFcn: Failed to get replicas for file.", + "%s %s" % ( lfn, reason ) ) + ## good replicas + lfnReplicas = res["Value"]["Successful"] + ## store PFN to LFN mapping + pfnDict = {} + for lfn, replicas in lfnReplicas.items(): + if storageElementName in replicas: + pfnDict[replicas[storageElementName]] = lfn + else: + errStr = "ReplicaManager._callReplicaSEFcn: File hasn't got replica at supplied Storage Element." + gLogger.error( errStr, "%s %s" % ( lfn, storageElementName ) ) + retDict["Failed"][lfn] = errStr + ## call StorageElement function at least + res = self._callStorageElementFcn( storageElementName, pfnDict.keys(), method, argsDict ) + ## check result + if not res["OK"]: + errStr = "ReplicaManager._callReplicaSEFcn: Failed to execute %s StorageElement method." % method + gLogger.error( errStr, res["Message"] ) + return res + ## filter out failed nad successful + for pfn, pfnRes in res["Value"]["Successful"].items(): + retDict["Successful"][pfnDict[pfn]] = pfnRes + for pfn, errorMessage in res["Value"]["Failed"].items(): + retDict["Failed"][pfnDict[pfn]] = errorMessage + return S_OK( retDict ) - def getReplicaIsFile( self, lfn, storageElementName, singleFile = False ): - """ Determine whether the supplied lfns are files at the supplied StorageElement + def getReplicaIsFile( self, lfn, storageElementName, singleFile=False ): + """ determine whether the supplied lfns are files at the supplied StorageElement - 'lfn' is the file(s) to check - 'storageElementName' is the target Storage Element + :param self: self reference + :param mixed lfn: LFN string, list if LFNs or dict with LFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first LFN only """ - if singleFile: - return self.__executeSingleReplicaStorageElementOperation( storageElementName, lfn, 'isFile' ) - else: - return self.__executeReplicaStorageElementOperation( storageElementName, lfn, 'isFile' ) + return self._replicaSEFcnWrapper(singleFile)( storageElementName, lfn, "isFile" ) - def getReplicaSize( self, lfn, storageElementName, singleFile = False ): - """ Obtain the file size for the lfns at the supplied StorageElement + def getReplicaSize( self, lfn, storageElementName, singleFile=False ): + """ get the size of files for the lfns at the supplied StorageElement - 'lfn' is the file(s) for which to get the size - 'storageElementName' is the target Storage Element + :param self: self reference + :param mixed lfn: LFN string, list if LFNs or dict with LFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first LFN only """ - if singleFile: - return self.__executeSingleReplicaStorageElementOperation( storageElementName, lfn, 'getFileSize' ) - else: - return self.__executeReplicaStorageElementOperation( storageElementName, lfn, 'getFileSize' ) + return self._replicaSEFcnWrapper(singleFile)( storageElementName, lfn, "getFileSize" ) - def getReplicaAccessUrl( self, lfn, storageElementName, singleFile = False ): - """ Obtain the access url for lfns at the supplied StorageElement + def getReplicaAccessUrl( self, lfn, storageElementName, singleFile=False ): + """ get the access url for lfns at the supplied StorageElement - 'lfn' is the file(s) for which to obtain access URLs - 'storageElementName' is the target Storage Element + :param self: self reference + :param mixed lfn: LFN string, list if LFNs or dict with LFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first LFN only """ - if singleFile: - return self.__executeSingleReplicaStorageElementOperation( storageElementName, lfn, 'getAccessUrl' ) - else: - return self.__executeReplicaStorageElementOperation( storageElementName, lfn, 'getAccessUrl' ) + return self._replicaSEFcnWrapper(singleFile)( storageElementName, lfn, "getAccessUrl" ) - def getReplicaMetadata( self, lfn, storageElementName, singleFile = False ): - """ Obtain the file metadata for lfns at the supplied StorageElement + def getReplicaMetadata( self, lfn, storageElementName, singleFile=False ): + """ get the file metadata for lfns at the supplied StorageElement - 'lfn' is the file(s) for which to get metadata - 'storageElementName' is the target Storage Element + :param self: self reference + :param mixed lfn: LFN string, list if LFNs or dict with LFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first LFN only """ - if singleFile: - return self.__executeSingleReplicaStorageElementOperation( storageElementName, lfn, 'getFileMetadata' ) - else: - return self.__executeReplicaStorageElementOperation( storageElementName, lfn, 'getFileMetadata' ) + return self._replicaSEFcnWrapper(singleFile)( storageElementName, lfn, "getFileMetadata" ) - def prestageReplica( self, lfn, storageElementName, lifetime = 60 * 60 * 24, singleFile = False ): - """ Issue prestage requests for the lfns at the supplied StorageElement + def prestageReplica( self, lfn, storageElementName, lifetime=86400, singleFile=False ): + """ issue a prestage requests for the lfns at the supplied StorageElement - 'lfn' is the file(s) for which to issue prestage requests - 'storageElementName' is the target Storage Element + :param self: self reference + :param mixed lfn: LFN string, list if LFNs or dict with LFNs as keys + :param str storageElementName: DIRAC SE name + :param int lifetime: 24h in seconds + :param bool singleFile: execute for the first LFN only """ - if singleFile: - return self.__executeSingleReplicaStorageElementOperation( storageElementName, lfn, 'prestageFile', {'lifetime':lifetime} ) - else: - return self.__executeReplicaStorageElementOperation( storageElementName, lfn, 'prestageFile', {'lifetime':lifetime} ) + return self._replicaSEFcnWrapper(singleFile)( storageElementName, lfn, + "prestageFile", argsDict={"lifetime": lifetime} ) - def getPrestageReplicaStatus( self, lfn, storageElementName, singleFile = False ): + def getPrestageReplicaStatus( self, lfn, storageElementName, singleFile=False ): """ This functionality is not supported. - """ - return S_ERROR( "This functionality is not supported. Please use getReplicaMetadata and check the 'Cached' element." ) - def pinReplica( self, lfn, storageElementName, lifetime = 60 * 60 * 24, singleFile = False ): - """ Issue a pin for the lfns at the supplied StorageElement - - 'lfn' is the file(s) for which to issue pins - 'storageElementName' is the target Storage Element - 'lifetime' is the pin lifetime (default 1 day) + Then what is it doing here? Not supported -> delete it! """ - if singleFile: - return self.__executeSingleReplicaStorageElementOperation( storageElementName, lfn, 'pinFile', {'lifetime':lifetime} ) - else: - return self.__executeReplicaStorageElementOperation( storageElementName, lfn, 'pinFile', {'lifetime':lifetime} ) + return S_ERROR( "Not supported functionality. Please use getReplicaMetadata and check the 'Cached' element." ) - def releaseReplica( self, lfn, storageElementName, singleFile = False ): - """ Release pins for the lfns at the supplied StorageElement + def pinReplica( self, lfn, storageElementName, lifetime=86400, singleFile=False ): + """ pin the lfns at the supplied StorageElement - 'lfn' is the file(s) for which to release pins - 'storageElementName' is the target Storage Element + :param self: self reference + :param mixed lfn: LFN string, list if LFNs or dict with LFNs as keys + :param str storageElementName: DIRAC SE name + :param int lifetime: 24h in seconds + :param bool singleFile: execute for the first LFN only """ - if singleFile: - return self.__executeSingleReplicaStorageElementOperation( storageElementName, lfn, 'releaseFile' ) - else: - return self.__executeReplicaStorageElementOperation( storageElementName, lfn, 'releaseFile' ) + return self._replicaSEFcnWrapper(singleFile)( storageElementName, lfn, + "pinFile", argsDict={"lifetime": lifetime} ) - def getReplica( self, lfn, storageElementName, localPath = False, singleFile = False ): - """ Get the lfns to the local disk from the supplied StorageElement + def releaseReplica( self, lfn, storageElementName, singleFile=False ): + """ release pins for the lfns at the supplied StorageElement - 'lfn' is the file(s) for which to release pins - 'storageElementName' is the target Storage Element - 'localPath' is the local target path (default '.') + :param self: self reference + :param mixed lfn: LFN string, list if LFNs or dict with LFNs as keys + :param str storageElementName: DIRAC SE name + :param bool singleFile: execute for the first LFN only """ - if singleFile: - return self.__executeSingleReplicaStorageElementOperation( storageElementName, lfn, 'getFile', {'localPath':localPath} ) - else: - return self.__executeReplicaStorageElementOperation( storageElementName, lfn, 'getFile', {'localPath':localPath} ) - - def __executeSingleReplicaStorageElementOperation( self, storageElementName, lfn, method, argsDict = {} ): - res = self.__executeReplicaStorageElementOperation( storageElementName, lfn, method, argsDict ) - if type( lfn ) == types.ListType: - lfn = lfn[0] - elif type( lfn ) == types.DictType: - lfn = lfn.keys()[0] - if not res['OK']: - return res - elif res['Value']['Failed'].has_key( lfn ): - errorMessage = res['Value']['Failed'][lfn] - return S_ERROR( errorMessage ) - else: - return S_OK( res['Value']['Successful'][lfn] ) + return self._replicaSEFcnWrapper(singleFile)( storageElementName, lfn, "releaseFile" ) - def __executeReplicaStorageElementOperation( self, storageElementName, lfn, method, argsDict = {} ): - """ A simple wrapper that allows replica querying then perform the StorageElement operation + def getReplica( self, lfn, storageElementName, localPath=False, singleFile=False ): + """ copy replicas from DIRAC SE to local directory + + :param self: self reference + :param mixed lfn: LFN string, list if LFNs or dict with LFNs as keys + :param str storageElementName: DIRAC SE name + :param mixed localPath: path in the local file system, if False, os.getcwd() will be used + :param bool singleFile: execute for the first LFN only """ - res = self._executeFileCatalogFunction( lfn, 'getReplicas' ) - if not res['OK']: - errStr = "ReplicaManager.__executeReplicaStorageElementOperation: Completely failed to get replicas for LFNs." - gLogger.error( errStr, res['Message'] ) - return res - failed = res['Value']['Failed'] - for lfn, reason in res['Value']['Failed'].items(): - gLogger.error( "ReplicaManager.__executeReplicaStorageElementOperation: Failed to get replicas for file.", "%s %s" % ( lfn, reason ) ) - lfnReplicas = res['Value']['Successful'] - pfnDict = {} - for lfn, replicas in lfnReplicas.items(): - if replicas.has_key( storageElementName ): - pfnDict[replicas[storageElementName]] = lfn - else: - errStr = "ReplicaManager.__executeReplicaStorageElementOperation: File does not have replica at supplied Storage Element." - gLogger.error( errStr, "%s %s" % ( lfn, storageElementName ) ) - failed[lfn] = errStr - res = self._executeStorageElementFunction( storageElementName, pfnDict.keys(), method, argsDict ) - if not res['OK']: - gLogger.error( "ReplicaManager.__executeReplicaStorageElementOperation: Failed to execute %s StorageElement operation." % method, res['Message'] ) - return res - successful = {} - for pfn, pfnRes in res['Value']['Successful'].items(): - successful[pfnDict[pfn]] = pfnRes - for pfn, errorMessage in res['Value']['Failed'].items(): - failed[pfnDict[pfn]] = errorMessage - resDict = {'Successful':successful, 'Failed':failed} - return S_OK( resDict ) + return self._replicaSEFcnWrapper(singleFile)( storageElementName, lfn, + "getFile", argsDict={"localPath": localPath} ) class ReplicaManager( CatalogToStorage ): + """ + .. class:: ReplicaManager + A ReplicaManager is putting all possible StorageElement and FileCatalog functionalities togehter. + """ def __init__( self ): - """ Constructor function. + """ c'tor + + :param self: self reference """ self.fileCatalogue = FileCatalog() self.accountingClient = None @@ -910,23 +1046,19 @@ def setAccountingClient( self, client ): def __verifyOperationPermission( self, path ): """ Check if we have write permission to the given directory """ - fc = FileCatalog() res = fc.getPathPermissions( path ) if not res['OK']: return res - paths = path - if type( path ) in types.StringTypes: - paths = [path] - - for p in paths: - if not res['Value']['Successful'].has_key( p ): + if type( path ) in StringTypes: + paths = [ path ] + for path in paths: + if path not in res['Value']['Successful']: return S_OK( False ) - catalogPerm = res['Value']['Successful'][p] - if not ( catalogPerm.has_key( 'Write' ) and catalogPerm['Write'] ): + catalogPerm = res['Value']['Successful'][path] + if not ( "Write" in catalogPerm and catalogPerm['Write'] ): return S_OK( False ) - return S_OK( True ) ########################################################################## @@ -937,30 +1069,33 @@ def __verifyOperationPermission( self, path ): def cleanLogicalDirectory( self, lfnDir ): """ Clean the logical directory from the catalog and storage """ - if type( lfnDir ) in types.StringTypes: + if type( lfnDir ) in StringTypes: lfnDir = [lfnDir] - successful = {} - failed = {} - for dir in lfnDir: - res = self.__cleanDirectory( dir ) + retDict = { "Successful" : {}, "Failed" : {} } + for folder in lfnDir: + res = self.__cleanDirectory( folder ) if not res['OK']: - gLogger.error( "Failed to clean directory.", "%s %s" % ( dir, res['Message'] ) ) - failed[dir] = res['Message'] + gLogger.error( "Failed to clean directory.", "%s %s" % ( folder, res['Message'] ) ) + retDict["Failed"][folder] = res['Message'] else: - gLogger.info( "Successfully removed directory.", dir ) - successful[dir] = res['Value'] - resDict = {'Successful':successful, 'Failed':failed} - return S_OK( resDict ) + gLogger.info( "Successfully removed directory.", folder ) + retDict["Successful"][folder] = res['Value'] + return S_OK( retDict ) - def __cleanDirectory( self, dir ): - res = self.__verifyOperationPermission( dir ) + def __cleanDirectory( self, folder ): + """ delete all files from directory :folder: in FileCatalog and StorageElement + + :param self: self reference + :param str folder: directory name + """ + res = self.__verifyOperationPermission( folder ) if not res['OK']: return res if not res['Value']: errStr = "ReplicaManager.__cleanDirectory: Write access not permitted for this credential." - gLogger.error( errStr, dir ) + gLogger.error( errStr, folder ) return S_ERROR( errStr ) - res = self.__getCatalogDirectoryContents( [dir] ) + res = self.__getCatalogDirectoryContents( [ folder ] ) if not res['OK']: return res replicaDict = {} @@ -981,17 +1116,23 @@ def __cleanDirectory( self, dir ): storageElements = gConfig.getValue( 'Resources/StorageElementGroups/SE_Cleaning_List', [] ) failed = False for storageElement in sortList( storageElements ): - res = self.__removeStorageDirectory( dir, storageElement ) + res = self.__removeStorageDirectory( folder, storageElement ) if not res['OK']: failed = True if failed: return S_ERROR( "Failed to clean storage directory at all SEs" ) - res = self.removeCatalogDirectory( dir, recursive = True, singleFile = True ) + res = self.removeCatalogDirectory( folder, recursive = True, singleFile = True ) if not res['OK']: return res return S_OK() def __removeStorageDirectory( self, directory, storageElement ): + """ delete SE directory + + :param self: self reference + :param str directory: folder to be removed + :param str storageElement: DIRAC SE name + """ gLogger.info( 'Removing the contents of %s at %s' % ( directory, storageElement ) ) res = self.getPfnForLfn( [directory], storageElement ) if not res['OK']: @@ -1002,7 +1143,7 @@ def __removeStorageDirectory( self, directory, storageElement ): if res['Value']['Failed']: return S_ERROR( 'Failed to obtain directory PFN from LFNs' ) storageDirectory = res['Value']['Successful'].values()[0] - res = self.getStorageFileExists( storageDirectory, storageElement, singleFile = True ) + res = self.getStorageFileExists( storageDirectory, storageElement, singleFile=True ) if not res['OK']: gLogger.error( "Failed to obtain existance of directory", res['Message'] ) return res @@ -1010,14 +1151,21 @@ def __removeStorageDirectory( self, directory, storageElement ): if not exists: gLogger.info( "The directory %s does not exist at %s " % ( directory, storageElement ) ) return S_OK() - res = self.removeStorageDirectory( storageDirectory, storageElement, recursive = True, singleDirectory = True ) + res = self.removeStorageDirectory( storageDirectory, storageElement, recursive=True, singleDirectory=True ) if not res['OK']: gLogger.error( "Failed to remove storage directory", res['Message'] ) return res - gLogger.info( "Successfully removed %d files from %s at %s" % ( res['Value']['FilesRemoved'], directory, storageElement ) ) + gLogger.info( "Successfully removed %d files from %s at %s" % ( res['Value']['FilesRemoved'], + directory, + storageElement ) ) return S_OK() def __getCatalogDirectoryContents( self, directories ): + """ ls recursively all files in directories + + :param self: self reference + :param list directories: folder names + """ gLogger.info( 'Obtaining the catalog contents for %d directories:' % len( directories ) ) for directory in directories: gLogger.info( directory ) @@ -1025,7 +1173,7 @@ def __getCatalogDirectoryContents( self, directories ): allFiles = {} while len( activeDirs ) > 0: currentDir = activeDirs[0] - res = self.getCatalogListDirectory( currentDir, singleFile = True ) + res = self.getCatalogListDirectory( currentDir, singleFile=True ) activeDirs.remove( currentDir ) if not res['OK'] and res['Message'].endswith( 'The supplied path does not exist' ): gLogger.info( "The supplied directory %s does not exist" % currentDir ) @@ -1039,7 +1187,12 @@ def __getCatalogDirectoryContents( self, directories ): return S_OK( allFiles ) def getReplicasFromDirectory( self, directory ): - if type( directory ) in types.StringTypes: + """ get all replicas from a given directory + + :param self: self reference + :param mixed directory: list of directories or one directory + """ + if type( directory ) in StringTypes: directories = [directory] else: directories = directory @@ -1051,8 +1204,8 @@ def getReplicasFromDirectory( self, directory ): allReplicas[lfn] = metadata['Replicas'] return S_OK( allReplicas ) - def getFilesFromDirectory( self, directory, days = 0, wildcard = '*' ): - if type( directory ) in types.StringTypes: + def getFilesFromDirectory( self, directory, days=0, wildcard='*' ): + if type( directory ) in StringTypes: directories = [directory] else: directories = directory @@ -1107,9 +1260,9 @@ def getFile( self, lfn, destinationDir = '' ): 'lfn' is the logical file name for the desired file """ - if type( lfn ) == types.ListType: + if type( lfn ) == ListType: lfns = lfn - elif type( lfn ) == types.StringType: + elif type( lfn ) == StringType: lfns = [lfn] else: errStr = "ReplicaManager.getFile: Supplied lfn must be string or list of strings." @@ -1146,7 +1299,10 @@ def __getFile( self, lfn, replicas, metadata, destinationDir ): return res for storageElementName in res['Value']: physicalFile = replicas[storageElementName] - res = self.getStorageFile( physicalFile, storageElementName, localPath = os.path.realpath( destinationDir ), singleFile = True ) + res = self.getStorageFile( physicalFile, + storageElementName, + localPath = os.path.realpath( destinationDir ), + singleFile = True ) if not res['OK']: gLogger.error( "Failed to get %s from %s" % ( lfn, storageElementName ), res['Message'] ) else: @@ -1155,9 +1311,11 @@ def __getFile( self, lfn, replicas, metadata, destinationDir ): localFile = os.path.realpath( "%s/%s" % ( destinationDir, os.path.basename( lfn ) ) ) localAdler = fileAdler( localFile ) if ( metadata['Size'] != res['Value'] ): - gLogger.error( "Size of downloaded file (%d) does not match catalog (%d)" % ( res['Value'], metadata['Size'] ) ) + gLogger.error( "Size of downloaded file (%d) does not match catalog (%d)" % ( res['Value'], + metadata['Size'] ) ) elif ( metadata['Checksum'] ) and ( not compareAdler( metadata['Checksum'], localAdler ) ): - gLogger.error( "Checksum of downloaded file (%s) does not match catalog (%s)" % ( localAdler, metadata['Checksum'] ) ) + gLogger.error( "Checksum of downloaded file (%s) does not match catalog (%s)" % ( localAdler, + metadata['Checksum'] ) ) else: return S_OK( localFile ) gLogger.error( "ReplicaManager.getFile: Failed to get local copy from any replicas.", lfn ) @@ -1167,11 +1325,11 @@ def _getSEProximity( self, ses ): siteName = DIRAC.siteName() localSEs = getSEsForSite( siteName )['Value'] countrySEs = [] - countryCode = siteName.split( '.' )[-1] + countryCode = siteName.split('.')[-1] res = getSEsForCountry( countryCode ) if res['OK']: countrySEs = res['Value'] - sortedSEs = [se for se in localSEs if se in ses] + sortedSEs = [ se for se in localSEs if se in ses ] for se in randomize( ses ): if ( se in countrySEs ) and ( not se in sortedSEs ): sortedSEs.append( se ) @@ -1180,7 +1338,7 @@ def _getSEProximity( self, ses ): sortedSEs.append( se ) return S_OK( sortedSEs ) - def putAndRegister( self, lfn, file, diracSE, guid = None, path = None, checksum = None, catalog = None, ancestors = [] ): + def putAndRegister( self, lfn, file, diracSE, guid=None, path=None, checksum=None, catalog=None, ancestors=None ): """ Put a local file to a Storage Element and register in the File Catalogues 'lfn' is the file LFN @@ -1189,6 +1347,7 @@ def putAndRegister( self, lfn, file, diracSE, guid = None, path = None, checksum 'guid' is the guid with which the file is to be registered (if not provided will be generated) 'path' is the path on the storage where the file will be put (if not provided the LFN will be used) """ + ancestors = ancestors if ancestors else list() res = self.__verifyOperationPermission( os.path.dirname( lfn ) ) if not res['OK']: return res @@ -1236,7 +1395,8 @@ def putAndRegister( self, lfn, file, diracSE, guid = None, path = None, checksum errStr = "ReplicaManager.putAndRegister: The supplied LFN already exists in the File Catalog." gLogger.error( errStr, lfn ) else: - errStr = "ReplicaManager.putAndRegister: This file GUID already exists for another file. Please remove it and try again." + errStr = "ReplicaManager.putAndRegister: This file GUID already exists for another file. " \ + "Please remove it and try again." gLogger.error( errStr, res['Value']['Successful'][lfn] ) return S_ERROR( "%s %s" % ( errStr, res['Value']['Successful'][lfn] ) ) # If the local file name is not the same as the LFN filename then use the LFN file name @@ -1282,7 +1442,7 @@ def putAndRegister( self, lfn, file, diracSE, guid = None, path = None, checksum gDataStoreClient.addRegister( oDataOperation ) startTime = time.time() gDataStoreClient.commit() - gLogger.info( 'ReplicaManager.putAndRegister: Sending accounting took %.1f seconds' % ( time.time() - startTime ) ) + gLogger.info( 'ReplicaManager.putAndRegister: Sending accounting took %.1f seconds' % ( time.time()-startTime ) ) gLogger.error( errStr, "%s: %s" % ( file, res['Message'] ) ) return S_ERROR( "%s %s" % ( errStr, res['Message'] ) ) successful[lfn] = {'put': putTime} @@ -1487,7 +1647,7 @@ def __replicate( self, lfn, destSE, sourceSE = '', destPath = '', localCache = ' def __initializeReplication( self, lfn, sourceSE, destSE ): ########################################################### # Check that the destination storage element is sane and resolve its name - gLogger.verbose( "ReplicaManager.__initializeReplication: Verifying destination Storage Element validity (%s)." % destSE ) + gLogger.verbose( "ReplicaManager.__initializeReplication: Verifying dest StorageElement validity (%s)." % destSE ) destStorageElement = StorageElement( destSE ) res = destStorageElement.isValid() if not res['OK']: @@ -1620,11 +1780,13 @@ def __resolveBestReplicas( self, sourceSE, lfnReplicas, catalogueSize ): def registerFile( self, fileTuple, catalog = '' ): """ Register a file. - 'fileTuple' is the file tuple to be registered of the form (lfn,physicalFile,fileSize,storageElementName,fileGuid) + :param self: self reference + :param tuple fileTuple: (lfn, physicalFile, fileSize, storageElementName, fileGuid ) + :param str catalog: catalog name """ - if type( fileTuple ) == types.ListType: + if type( fileTuple ) == ListType: fileTuples = fileTuple - elif type( fileTuple ) == types.TupleType: + elif type( fileTuple ) == TupleType: fileTuples = [fileTuple] else: errStr = "ReplicaManager.registerFile: Supplied file info must be tuple of list of tuples." @@ -1663,7 +1825,7 @@ def __registerFile( self, fileTuples, catalog ): pfn = physicalFile else: pfn = res['Value'] - tuple = ( lfn, pfn, fileSize, storageElementName, fileGuid, checksum ) + #tuple = ( lfn, pfn, fileSize, storageElementName, fileGuid, checksum ) fileDict[lfn] = {'PFN':pfn, 'Size':fileSize, 'SE':storageElementName, 'GUID':fileGuid, 'Checksum':checksum} gLogger.verbose( "ReplicaManager.__registerFile: Resolved %s files for registration." % len( fileDict.keys() ) ) if catalog: @@ -1685,10 +1847,10 @@ def registerReplica( self, replicaTuple, catalog = '' ): 'replicaTuple' is a tuple or list of tuples of the form (lfn,pfn,se) """ - if type( replicaTuple ) == types.ListType: + if type( replicaTuple ) == ListType: replicaTuples = replicaTuple - elif type( replicaTuple ) == types.TupleType: - replicaTuples = [replicaTuple] + elif type( replicaTuple ) == TupleType: + replicaTuples = [ replicaTuple ] else: errStr = "ReplicaManager.registerReplica: Supplied file info must be tuple of list of tuples." gLogger.error( errStr ) @@ -1756,9 +1918,9 @@ def removeFile( self, lfn ): 'lfn' is the file to be removed """ - if type( lfn ) == types.ListType: + if type( lfn ) == ListType: lfns = lfn - elif type( lfn ) == types.StringType: + elif type( lfn ) == StringType: lfns = [lfn] else: errStr = "ReplicaManager.removeFile: Supplied lfns must be string or list of strings." @@ -1813,7 +1975,7 @@ def __removeFile( self, lfnDict ): storageElementDict = {} for lfn, repDict in lfnDict.items(): for se, pfn in repDict.items(): - if not storageElementDict.has_key( se ): + if se not in storageElementDict: storageElementDict[se] = [] storageElementDict[se].append( ( lfn, pfn ) ) failed = {} @@ -1823,17 +1985,17 @@ def __removeFile( self, lfnDict ): if not res['OK']: errStr = res['Message'] for lfn, pfn in fileTuple: - if not failed.has_key( lfn ): + if lfn not in failed: failed[lfn] = '' failed[lfn] = "%s %s" % ( failed[lfn], errStr ) else: for lfn, error in res['Value']['Failed'].items(): - if not failed.has_key( lfn ): + if lfn not in failed: failed[lfn] = '' failed[lfn] = "%s %s" % ( failed[lfn], error ) completelyRemovedFiles = [] - for lfn in lfnDict.keys(): - if not failed.has_key( lfn ): + for lfn in lfnDict: + if lfn not in failed: completelyRemovedFiles.append( lfn ) if completelyRemovedFiles: res = self.fileCatalogue.removeFile( completelyRemovedFiles ) @@ -1852,9 +2014,9 @@ def removeReplica( self, storageElementName, lfn ): 'storageElementName' is the storage where the file is to be removed 'lfn' is the file to be removed """ - if type( lfn ) == types.ListType: + if type( lfn ) == ListType: lfns = lfn - elif type( lfn ) == types.StringType: + elif type( lfn ) == StringType: lfns = [lfn] else: errStr = "ReplicaManager.removeReplica: Supplied lfns must be string or list of strings." @@ -1868,7 +2030,8 @@ def removeReplica( self, storageElementName, lfn ): errStr = "ReplicaManager.__replicate: Write access not permitted for this credential." gLogger.error( errStr, lfns ) return S_ERROR( errStr ) - gLogger.verbose( "ReplicaManager.removeReplica: Attempting to remove catalogue entry for %s lfns at %s." % ( len( lfns ), storageElementName ) ) + gLogger.verbose( "ReplicaManager.removeReplica: Attempting to remove catalogue entry for %s lfns at %s." % ( len(lfns), + storageElementName ) ) res = self.fileCatalogue.getReplicas( lfns, True ) if not res['OK']: errStr = "ReplicaManager.removeReplica: Completely failed to get replicas for lfns." @@ -1936,9 +2099,9 @@ def __removeReplica( self, storageElementName, fileTuple ): def removeReplicaFromCatalog( self, storageElementName, lfn ): # Remove replica from the file catalog 'lfn' are the file to be removed 'storageElementName' is the storage where the file is to be removed - if type( lfn ) == types.ListType: + if type( lfn ) == ListType: lfns = lfn - elif type( lfn ) == types.StringType: + elif type( lfn ) == StringType: lfns = [lfn] else: errStr = "ReplicaManager.removeCatalogReplica: Supplied lfns must be string or list of strings." @@ -1973,9 +2136,9 @@ def removeCatalogPhysicalFileNames( self, replicaTuple ): 'replicaTuple' is a tuple containing the replica to be removed and is of the form (lfn,pfn,se) """ - if type( replicaTuple ) == types.ListType: + if type( replicaTuple ) == ListType: replicaTuples = replicaTuple - elif type( replicaTuple ) == types.TupleType: + elif type( replicaTuple ) == TupleType: replicaTuples = [replicaTuple] else: errStr = "ReplicaManager.removeCatalogPhysicalFileNames: Supplied info must be tuple or list of tuples." @@ -2020,9 +2183,9 @@ def removePhysicalReplica( self, storageElementName, lfn ): 'lfn' are the files to be removed 'storageElementName' is the storage where the file is to be removed """ - if type( lfn ) == types.ListType: + if type( lfn ) == ListType: lfns = lfn - elif type( lfn ) == types.StringType: + elif type( lfn ) == StringType: lfns = [lfn] else: errStr = "ReplicaManager.removePhysicalReplica: Supplied lfns must be string or list of strings." @@ -2036,7 +2199,8 @@ def removePhysicalReplica( self, storageElementName, lfn ): errStr = "ReplicaManager.__replicate: Write access not permitted for this credential." gLogger.error( errStr, lfns ) return S_ERROR( errStr ) - gLogger.verbose( "ReplicaManager.removePhysicalReplica: Attempting to remove %s lfns at %s." % ( len( lfns ), storageElementName ) ) + gLogger.verbose( "ReplicaManager.removePhysicalReplica: Attempting to remove %s lfns at %s." % ( len( lfns ), + storageElementName ) ) gLogger.verbose( "ReplicaManager.removePhysicalReplica: Attempting to resolve replicas." ) res = self.fileCatalogue.getReplicas( lfns ) if not res['OK']: @@ -2053,7 +2217,8 @@ def removePhysicalReplica( self, storageElementName, lfn ): else: sePfn = repDict[storageElementName] pfnDict[sePfn] = lfn - gLogger.verbose( "ReplicaManager.removePhysicalReplica: Resolved %s pfns for removal at %s." % ( len( pfnDict.keys() ), storageElementName ) ) + gLogger.verbose( "ReplicaManager.removePhysicalReplica: Resolved %s pfns for removal at %s." % ( len( pfnDict.keys() ), + storageElementName ) ) res = self.__removePhysicalReplica( storageElementName, pfnDict.keys() ) for pfn, error in res['Value']['Failed'].items(): failed[pfnDict[pfn]] = error @@ -2063,7 +2228,8 @@ def removePhysicalReplica( self, storageElementName, lfn ): return S_OK( resDict ) def __removePhysicalReplica( self, storageElementName, pfnsToRemove ): - gLogger.verbose( "ReplicaManager.__removePhysicalReplica: Attempting to remove %s pfns at %s." % ( len( pfnsToRemove ), storageElementName ) ) + gLogger.verbose( "ReplicaManager.__removePhysicalReplica: Attempting to remove %s pfns at %s." % ( len( pfnsToRemove ), + storageElementName ) ) storageElement = StorageElement( storageElementName, overwride = True ) res = storageElement.isValid() if not res['OK']: @@ -2104,6 +2270,10 @@ def __removePhysicalReplica( self, storageElementName, pfnsToRemove ): def put( self, lfn, file, diracSE, path = None ): """ Put a local file to a Storage Element + :param self: self reference + :param str lfn: LFN + :param : + 'lfn' is the file LFN 'file' is the full path to the local file 'diracSE' is the Storage Element to which to put the file @@ -2181,18 +2351,18 @@ def checkActiveReplicas( self, replicaDict ): """ Check a replica dictionary for active replicas """ - if type( replicaDict ) != types.DictType: - return S_ERROR( 'Wrong argument type %s, expected a Dictionary' % type( replicaDict ) ) + if type( replicaDict ) != DictType: + return S_ERROR( 'Wrong argument type %s, expected a dictionary' % type( replicaDict ) ) for key in [ 'Successful', 'Failed' ]: if not key in replicaDict: - return S_ERROR( 'Missing key "%s" in replica Dictionary' % key ) - if type( replicaDict[key] ) != types.DictType: - return S_ERROR( 'Wrong argument type %s, expected a Dictionary' % type( replicaDict[key] ) ) + return S_ERROR( 'Missing key "%s" in replica dictionary' % key ) + if type( replicaDict[key] ) != DictType: + return S_ERROR( 'Wrong argument type %s, expected a dictionary' % type( replicaDict[key] ) ) seReadStatus = {} for lfn, replicas in replicaDict['Successful'].items(): - if type( replicas ) != types.DictType: + if type( replicas ) != DictType: del replicaDict['Successful'][ lfn ] replicaDict['Failed'][lfn] = 'Wrong replica info' continue @@ -2258,7 +2428,7 @@ def onlineRetransfer( self, storageElementName, physicalFile ): 'storageElementName' is the storage element where the file should be removed from 'physicalFile' is the physical files """ - return self._executeStorageElementFunction( storageElementName, physicalFile, 'retransferOnlineFile' ) + return self._callStorageElementFcn( storageElementName, physicalFile, 'retransferOnlineFile' ) def getReplicas( self, lfn ): return self.getCatalogReplicas( lfn ) diff --git a/DataManagementSystem/scripts/dirac-transformation-resolve-problematics.py b/DataManagementSystem/scripts/dirac-transformation-resolve-problematics.py index 055fb75889a..250ae566382 100755 --- a/DataManagementSystem/scripts/dirac-transformation-resolve-problematics.py +++ b/DataManagementSystem/scripts/dirac-transformation-resolve-problematics.py @@ -2,7 +2,7 @@ ######################################################################## # $HeadURL: $ ######################################################################## -__RCSID__ = "$Id: $" +__RCSID__ = "$Id$" from DIRAC.Core.Base import Script @@ -16,16 +16,16 @@ Script.parseCommandLine() import DIRAC -from DIRAC import gLogger,S_OK,S_ERROR -from DIRAC.Core.Utilities.List import sortList +from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC.Core.Utilities.List import sortList from LHCbDIRAC.DataManagementSystem.Client.DataIntegrityClient import DataIntegrityClient integrityClient = DataIntegrityClient() -def resolveTransforamtionProblematics(transID): +def resolveTransforamtionProblematics( transID ): gLogger.notice("Obtaining problematic files for transformation %d" % transID) res = integrityClient.getTransformationProblematics(transID) if not res['OK']: - gLogger.error("Failed to get transformation problematic files",res['Message']) + gLogger.error("Failed to get transformation problematic files", res['Message']) return S_ERROR() problematicFiles = res['Value'] if not problematicFiles: @@ -34,15 +34,20 @@ def resolveTransforamtionProblematics(transID): for lfn in sortList(problematicFiles.keys()): prognosis = problematicFiles[lfn]['Prognosis'] problematicDict = problematicFiles[lfn] - execString = "res = integrityClient.resolve%s(problematicDict)" % prognosis - try: - exec(execString) - except AttributeError: - gLogger.error("Resolution method for %s not available" % prognosis) + gLogger.notice("Prognosis is %s" % prognosis ) + if not hasattr( integrityClient, methodToCall ): + gLogger.notice( "DataIntegrityClient hasn't got '%s' member" % methodToCall ) + continue + fcn = getattr( integrityClient, methodToCall ) + if not callable( fcn ): + gLogger.notice( "DataIntegrityClient member '%s' isn't a method" % methodToCall ) + continue + ## results not checked??? Where is The Food? + res = fcn( problematicDict ) gLogger.notice("Problematic files resolved for transformation %d" % transID) return S_OK() -transIDs = [int(x) for x in Script.getPositionalArgs()] +transIDs = [ int(x) for x in Script.getPositionalArgs() ] if not transIDs: gLogger.notice("Please supply transformationIDs as arguments") diff --git a/Resources/Catalog/LcgFileCatalogClient.py b/Resources/Catalog/LcgFileCatalogClient.py index 982fa41f465..caaa672190c 100755 --- a/Resources/Catalog/LcgFileCatalogClient.py +++ b/Resources/Catalog/LcgFileCatalogClient.py @@ -1072,21 +1072,19 @@ def __checkArgumentFormat( self, path ): def __executeOperation( self, path, method ): """ Executes the requested functionality with the supplied path """ - execString = "res = self.%s(path)" % method - try: - exec( execString ) - if type( path ) == types.DictType: - path = path.keys()[0] - if not res['OK']: - return res - elif not res['Value']['Successful'].has_key( path ): - return S_ERROR( res['Value']['Failed'][path] ) - else: - return S_OK( res['Value']['Successful'][path] ) - except AttributeError, errMessage: - exceptStr = "LcgFileCatalogClient.__executeOperation: Exception while perfoming %s." % method - gLogger.exception( exceptStr, '', errMessage ) - return S_ERROR( "%s%s" % ( exceptStr, errMessage ) ) + fcn = None + if hasattr( self, method ) and callable( getattr(self, method) ): + fcn = getattr( self, method ) + if not fcn: + return S_ERROR("Unable to invoke %s, it isn't a member function of LcgFileCatalogClient" % method ) + res = fcn( path ) + if type( path ) == types.DictType: + path = path.keys()[0] + if not res['OK']: + return res + elif path not in res['Value']['Successful']: + return S_ERROR( res['Value']['Failed'][path] ) + return S_OK( res['Value']['Successful'][path] ) def __getLFNForPFN( self, pfn ): fstat = lfc.lfc_filestatg() diff --git a/Resources/Storage/DIPStorage.py b/Resources/Storage/DIPStorage.py index 59c4e315645..9fe0caa206d 100755 --- a/Resources/Storage/DIPStorage.py +++ b/Resources/Storage/DIPStorage.py @@ -589,16 +589,18 @@ def __checkArgumentFormatDict( self, path ): def __executeOperation( self, url, method ): """ Executes the requested functionality with the supplied url """ - execString = "res = self.%s(url)" % method - try: - exec( execString ) - if not res['OK']: - return S_ERROR( res['Message'] ) - elif not res['Value']['Successful'].has_key( url ): - return S_ERROR( res['Value']['Failed'][url] ) - else: - return S_OK( res['Value']['Successful'][url] ) - except AttributeError, errMessage: - exceptStr = "DIPStorage.__executeOperation: Exception while perfoming %s." % method - gLogger.exception( exceptStr, '', errMessage ) - return S_ERROR( "%s%s" % ( exceptStr, errMessage ) ) + fcn = None + if hasattr( self, method ) and callable( getattr( self, method ) ): + fcn = getattr( self, method ) + if not fcn: + return S_ERROR("Unable to invoke %s, it isn't a member function of DIPStorage" % method ) + + res = fcn( url ) + if not res['OK']: + return res + elif url not in res['Value']['Successful']: + return S_ERROR( res['Value']['Failed'][url] ) + + return S_OK( res['Value']['Successful'][url] ) + + diff --git a/Resources/Storage/ProxyStorage.py b/Resources/Storage/ProxyStorage.py index 1155ba3e0cb..1230eebf47b 100644 --- a/Resources/Storage/ProxyStorage.py +++ b/Resources/Storage/ProxyStorage.py @@ -1,5 +1,8 @@ -""" This is the Proxy storage element client """ +################################################################################# +# $HeadURL $ +################################################################################# +""" This is the Proxy storage element client """ __RCSID__ = "$Id$" @@ -227,16 +230,15 @@ def __checkArgumentFormatDict( self, path ): def __executeOperation( self, url, method ): """ Executes the requested functionality with the supplied url """ - execString = "res = self.%s(url)" % method - try: - exec( execString ) - if not res['OK']: - return S_ERROR( res['Message'] ) - elif not res['Value']['Successful'].has_key( url ): - return S_ERROR( res['Value']['Failed'][url] ) - else: - return S_OK( res['Value']['Successful'][url] ) - except AttributeError, errMessage: - exceptStr = "ProxyStorage.__executeOperation: Exception while perfoming %s." % method - gLogger.exception( exceptStr, '', errMessage ) - return S_ERROR( "%s%s" % ( exceptStr, errMessage ) ) + fcn = None + if hasattr( self, method ) and callable( getattr( self, method ) ): + fcn = getattr( self, method ) + if not fcn: + return S_ERROR("Unable to invoke %s, it isn't a member function of ProxyStorage" % method ) + res = fcn( url ) + if not res['OK']: + return res + elif url not in res['Value']['Successful']: + return S_ERROR( res['Value']['Failed'][url] ) + return S_OK( res['Value']['Successful'][url] ) + diff --git a/Resources/Storage/RFIOStorage.py b/Resources/Storage/RFIOStorage.py index 90743dd40a8..5d8d1b85b92 100755 --- a/Resources/Storage/RFIOStorage.py +++ b/Resources/Storage/RFIOStorage.py @@ -1,3 +1,7 @@ +########################################################################### +# $HeadURL$ +########################################################################### + """ This is the RFIO StorageClass """ __RCSID__ = "$Id$" @@ -1077,16 +1081,15 @@ def __checkArgumentFormatDict( self, path ): def __executeOperation( self, url, method ): """ Executes the requested functionality with the supplied url """ - execString = "res = self.%s(url)" % method - try: - exec( execString ) - if not res['OK']: - return S_ERROR( res['Message'] ) - elif not res['Value']['Successful'].has_key( url ): + fcn = None + if hasattr(self, method) and callable( getattr(self, method) ): + fcn = getattr( self, method ) + if not fcn: + return S_ERROR("Unable to invoke %s, it isn't a member funtion of RFIOStorage" % method ) + res = fcn( url ) + if not res['OK']: + return res + elif url not in res['Value']['Successful']: return S_ERROR( res['Value']['Failed'][url] ) - else: - return S_OK( res['Value']['Successful'][url] ) - except AttributeError, errMessage: - exceptStr = "RFIOStorage.__executeOperation: Exception while performing %s." % method - gLogger.exception( exceptStr, '', errMessage ) - return S_ERROR( "%s%s" % ( exceptStr, errMessage ) ) + return S_OK( res['Value']['Successful'][url] ) + diff --git a/Resources/Storage/SRM2Storage.py b/Resources/Storage/SRM2Storage.py index 1058f353abf..49b72565e1f 100755 --- a/Resources/Storage/SRM2Storage.py +++ b/Resources/Storage/SRM2Storage.py @@ -827,28 +827,27 @@ def __getFile( self, src_url, dest_file ): def __executeOperation( self, url, method ): """ Executes the requested functionality with the supplied url """ - execString = "res = self.%s(url)" % method - try: - exec( execString ) - if not res['OK']: - return S_ERROR( res['Message'] ) - elif not res['Value']['Successful'].has_key( url ): - if not res['Value']['Failed'].has_key( url ): - if res['Value']['Failed'].values(): - return S_ERROR( res['Value']['Failed'].values()[0] ) - elif res['Value']['Successful'].values(): - return S_OK( res['Value']['Successful'].values()[0] ) - else: - gLogger.error( 'Wrong Return structure', str( res['Value'] ) ) - return S_ERROR( 'Wrong Return structure' ) - return S_ERROR( res['Value']['Failed'][url] ) - else: - return S_OK( res['Value']['Successful'][url] ) - except AttributeError, errMessage: - exceptStr = "SRM2Storage.__executeOperation: Exception while perfoming %s." % method - gLogger.exception( exceptStr, '', errMessage ) - return S_ERROR( "%s%s" % ( exceptStr, errMessage ) ) + fcn = None + if hasattr( self, method ) and callable( getattr(self, method) ): + fcn = getattr( self, method ) + if not fcn: + return S_ERROR("Unable to invoke %s, it isn't a member funtion of SRM2Storage" % method ) + res = fcn( url ) + if not res['OK']: + return res + elif url not in res['Value']['Successful']: + if url not in res['Value']['Failed']: + if res['Value']['Failed'].values(): + return S_ERROR( res['Value']['Failed'].values()[0] ) + elif res['Value']['Successful'].values(): + return S_OK( res['Value']['Successful'].values()[0] ) + else: + gLogger.error( 'Wrong Return structure', str( res['Value'] ) ) + return S_ERROR( 'Wrong Return structure' ) + return S_ERROR( res['Value']['Failed'][url] ) + return S_OK( res['Value']['Successful'][url] ) + ############################################################################################ # # Directory based methods @@ -1852,27 +1851,23 @@ def __gfal_exec( self, gfalObject, method, timeout_sendreceive = None ): """ gLogger.debug( "SRM2Storage.__gfal_exec: Performing %s." % method ) - execString = "errCode,gfalObject,errMessage = self.gfal.%s(gfalObject)" % method - try: - if timeout_sendreceive: - # For asynchronous methods this timeout defines how long the connection to set the - # request can take - self.gfal.gfal_set_timeout_sendreceive( timeout_sendreceive ) - exec( execString ) - if not errCode == 0: - errStr = "SRM2Storage.__gfal_exec: Failed to perform %s." % method - if not errMessage: - errMessage = os.strerror( errCode ) - gLogger.error( errStr, errMessage ) - return S_ERROR( "%s%s" % ( errStr, errMessage ) ) - else: - gLogger.debug( "SRM2Storage.__gfal_exec: Successfully performed %s." % method ) - return S_OK( gfalObject ) - except AttributeError, errMessage: - exceptStr = "SRM2Storage.__gfal_exec: Exception while perfoming %s." % method - gLogger.exception( exceptStr, '', errMessage ) - return S_ERROR( "%s%s" % ( exceptStr, errMessage ) ) - + fcn = None + if hasattr( self.gfal, method ) and callable( getattr( self.gfal, method) ): + fcn = getattr( self.gfal, method ) + if not fcn: + return S_ERROR( "Unable to invoke %s for gfal, it isn't a member function" % method ) + if timeout_sendreceive: + self.gfal.gfal_set_timeout_sendreceive( timeout_sendreceive ) + errCode, gfalObject, errMessage = fcn( gfalObject ) + if errCode: + errStr = "SRM2Storage.__gfal_exec: Failed to perform %s." % method + if not errMessage: + errMessage = os.strerror( errCode ) + gLogger.error( errStr, errMessage ) + return S_ERROR( "%s%s" % ( errStr, errMessage ) ) + gLogger.debug( "SRM2Storage.__gfal_exec: Successfully performed %s." % method ) + return S_OK( gfalObject ) + # These methods are for retrieving output information def __get_results( self, gfalObject ): diff --git a/Resources/Storage/StorageElement.py b/Resources/Storage/StorageElement.py index b7cdc0c5b9f..6f1f67cb2a1 100755 --- a/Resources/Storage/StorageElement.py +++ b/Resources/Storage/StorageElement.py @@ -607,26 +607,18 @@ def __executeFunction( self, pfn, method, argsDict = None ): gLogger.verbose( "StorageElement.__executeFunction No pfns generated for protocol %s." % protocolName ) else: gLogger.verbose( "StorageElement.__executeFunction: Attempting to perform '%s' for %s physical files." % ( method, len( pfnDict.keys() ) ) ) + fcn = None + if hasattr( storage, method ) and callable( getattr( storage, method ) ): + fcn = getattr( storage, method ) + if not fcn: + return S_ERROR("StorageElement.__executeFunction: unable to invoke %s, it isn't a member function of storage") + pfnsToUse = {} for pfn in pfnDict.keys(): pfnsToUse[pfn] = pfns[pfnDict[pfn]] - if argsDict: - execString = "res = storage.%s(pfnsToUse" % method - for argument, value in argsDict.items(): - if type( value ) == types.StringType: - execString = "%s, %s='%s'" % ( execString, argument, value ) - else: - execString = "%s, %s=%s" % ( execString, argument, value ) - execString = "%s)" % execString - else: - execString = "res = storage.%s(pfnsToUse)" % method - try: - exec( execString ) - except AttributeError, errMessage: - exceptStr = "StorageElement.__executeFunction: Exception while performing %s." % method - gLogger.exception( exceptStr, str( errMessage ) ) - res = S_ERROR( exceptStr ) - + + res = fcn( pfnsToUse, **argsDict ) + if not res['OK']: errStr = "StorageElement.__executeFunction: Completely failed to perform %s." % method gLogger.error( errStr, '%s for protocol %s: %s' % ( self.name, protocolName, res['Message'] ) ) diff --git a/StorageManagementSystem/Agent/RequestFinalizationAgent.py b/StorageManagementSystem/Agent/RequestFinalizationAgent.py index a10f4952a7f..5ef50bf1a6a 100755 --- a/StorageManagementSystem/Agent/RequestFinalizationAgent.py +++ b/StorageManagementSystem/Agent/RequestFinalizationAgent.py @@ -1,4 +1,6 @@ +########################################################################## # $HeadURL$ +########################################################################## __RCSID__ = "$Id$" @@ -111,9 +113,13 @@ def __performCallback( self, status, callback, sourceTask ): gLogger.debug( "RequestFinalization.__performCallback: Attempting to perform call back for %s with %s status" % ( sourceTask, status ) ) client = RPCClient( service ) gLogger.debug( "RequestFinalization.__performCallback: Created RPCClient to %s" % service ) - execString = "res = client.%s('%s','%s')" % ( method, sourceTask, status ) + fcn = None + if hasattr( client, method ) and callable( getattr( client, method ) ): + fcn = getattr( client, method ) + if not fcn: + return S_ERROR( "Unable to invoke %s, it isn't a member funtion of %s" % ( method, service ) ) gLogger.debug( "RequestFinalization.__performCallback: Attempting to invoke %s service method" % method ) - exec( execString ) + res = fcn( sourceTask, status ) if not res['OK']: gLogger.error( "RequestFinalization.__performCallback: Failed to perform callback", res['Message'] ) else: diff --git a/TransformationSystem/Client/Transformation.py b/TransformationSystem/Client/Transformation.py index 77f1261d6bb..cdf60fa5b97 100644 --- a/TransformationSystem/Client/Transformation.py +++ b/TransformationSystem/Client/Transformation.py @@ -1,14 +1,16 @@ -# $HeadURL: svn+ssh://svn.cern.ch/reps/dirac/DIRAC/trunk/DIRAC/Interfaces/API/Transformation.py $ -__RCSID__ = "$Id: Transformation.py 19505 $" +###################################################################################################### +# $HeadURL $ +###################################################################################################### +__RCSID__ = "$Id$" #from DIRAC.Core.Base import Script #Script.parseCommandLine() import string, os, shutil, types, pprint -from DIRAC import gConfig, gLogger, S_OK, S_ERROR -from DIRAC.Core.Base.API import API -from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient +from DIRAC import gConfig, gLogger, S_OK, S_ERROR +from DIRAC.Core.Base.API import API +from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient COMPONENT_NAME = 'Transformation' @@ -112,13 +114,13 @@ def __getParam( self ): return S_OK( self.paramTypes.keys() ) if self.item_called == 'Parameters': return S_OK( self.paramValues ) - if self.item_called in self.paramValues.keys(): + if self.item_called in self.paramValues: return S_OK( self.paramValues[self.item_called] ) raise AttributeError, "Unknown parameter for transformation: %s" % self.item_called def __setParam( self, value ): change = False - if self.item_called in self.paramTypes.keys(): + if self.item_called in self.paramTypes: oldValue = self.paramValues[self.item_called] if oldValue != value: if type( value ) in self.paramTypes[self.item_called]: @@ -156,8 +158,14 @@ def getTransformation( self, printOutput = False ): return res transParams = res['Value'] for paramName, paramValue in transParams.items(): - execString = "self.set%s(paramValue)" % paramName - exec( execString ) + setter = None + setterName = "set%s" % paramName + if hasattr( self, setterName ) and callable( getattr( self, setterName ) ): + setter = getattr( self, setterName ) + if not setterName: + gLogger.error("Unable to invoke setter %s, it isn't a member function" % setterName ) + continue + setter( paramValue ) if printOutput: gLogger.info( "No printing available yet" ) return S_OK( transParams ) @@ -219,8 +227,12 @@ def __executeOperation( self, operation, *parms, **kwds ): gLogger.fatal( "No TransformationID known" ) return S_ERROR() printOutput = kwds.pop( 'printOutput' ) - execString = "res = self.transClient.%s(transID,*parms,**kwds)" % operation - exec( execString ) + fcn = None + if hasattr( self.transClient, operation ) and callable( getattr( self.transClient, operation) ): + fcn = getattr( self.transClient, operation ) + if not fcn: + return S_ERROR("Unable to invoke %s, it isn't a member funtion of TransformationClient" ) + res = fcn( transID, *param, **kwds ) if printOutput: self._prettyPrint( res ) return res @@ -349,8 +361,13 @@ def _checkCreation( self ): if not res['OK']: return res plugin = self.paramValues['Plugin'] - execString = "res = self._check%sPlugin()" % plugin - exec( execString ) + checkPlugin = "_check%sPlugin" % plugin + fcn = None + if hasattr( self, checkPlugin ) and callable( getattr( self, checkPlugin ) ): + fcn = getattr( self, checkPlugin ) + if not fcn: + return S_ERROR("Unable to invoke %s, it isn't a member function" % checkPlugin ) + res = fcn() return res def _checkBySizePlugin( self ): @@ -377,9 +394,14 @@ def _checkBroadcastPlugin( self ): if not res['OK']: return res paramValue = res['Value'] + setter = None + setterName = "set%s" % requiredParam + if hasattr( self, setterName ) and callable( getattr( self, setterName ) ): + setter = getattr( self, setterName ) + if not setter: + return S_ERROR("Unable to invoke %s, this function hasn't been implemented." % setterName ) ses = paramValue.replace( ',', ' ' ).split() - execString = "res = self.set%s(ses)" % requiredParam - exec( execString ) + res = setter( ses ) if not res['OK']: return res return S_OK() @@ -404,8 +426,13 @@ def __promptForParameter( self, parameter, choices = [], default = '', insert = gLogger.info( "%s will be set to '%s'" % ( parameter, res['Value'] ) ) paramValue = res['Value'] if insert: - execString = "res = self.set%s(paramValue)" % parameter - exec( execString ) + setter = None + setterName = "set%s" % parameter + if hasattr( self, setterName ) and callable( getattr( self, setterName ) ): + setter = getattr( self, setterName ) + if not setter: + return S_ERROR( "Unable to invoke %s, it isn't a member function of Tranferomation!" ) + res = setter( paramValue ) if not res['OK']: return res return S_OK( paramValue ) diff --git a/TransformationSystem/Service/TransformationManagerHandler.py b/TransformationSystem/Service/TransformationManagerHandler.py index c22f7d34dff..a81d9ac46de 100644 --- a/TransformationSystem/Service/TransformationManagerHandler.py +++ b/TransformationSystem/Service/TransformationManagerHandler.py @@ -460,8 +460,13 @@ def __getTableSummaryWeb( self, table, selectDict, sortList, startItem, maxItems else: orderAttribute = None # Get the columns that match the selection - execString = "res = database.get%s(condDict=selectDict,older=toDate, newer=fromDate, timeStamp=timeStamp, orderAttribute=orderAttribute)" % table - exec( execString ) + fcn = None + fcnName = "get%s" % table + if hasattr( database, fcnName ) and callable( getattr( database, fcnName ) ): + fcn = getattr( database, fcnName ) + if not fcn: + return S_ERROR( "Unable to invoke database.%s, it isn't a member function of database" % fcnName ) + res = fcn( condDict=selectDict, older=toDate, newer=fromDate, timeStamp=timeStamp, orderAttribute=orderAttribute ) if not res['OK']: return self._parseRes( res ) diff --git a/TransformationSystem/test/TestClientTransformation.py b/TransformationSystem/test/TestClientTransformation.py index 2e2ac931489..5a8a7a990d5 100755 --- a/TransformationSystem/test/TestClientTransformation.py +++ b/TransformationSystem/test/TestClientTransformation.py @@ -56,14 +56,23 @@ def test_SetGetReset(self): testValue = "'TestValue'" else: testValue = '99999' - execString = "res = oTrans.set%s(%s)" % (parameterName,testValue) - exec(execString) - self.assert_(res['OK']) - execString = "res = oTrans.get%s()" % (parameterName) - exec(execString) - self.assert_(res['OK']) - self.assertEqual(res['Value'],eval(testValue)) + ## set* + setterName = "set%s" % parameterName + self.assertEqual( hasattr( oTrans, setterName ), True ) + setter = getattr( oTrans, setterName ) + self.asserEqual( callable(setter), True ) + res = setter( testValue ) + self.assertEqual( res["OK"], True ) + ## get* + getterName = "get%s" % parameterName + self.assertEqual( hasattr( oTrans, getterName ), True ) + getter = getattr( oTrans, setterName ) + self.asserEqual( callable(getter), True ) + res = getter() + self.asserEqual( res["OK"], True ) + self.asserEqual( res["Value"], eval(testValue) ) + # Test that SEs delivered as a space or comma seperated string are resolved... stringSEs = 'CERN-USER, CNAF-USER GRIDKA-USER,IN2P3-USER' listSEs = stringSEs.replace(',',' ').split()