diff --git a/ConfigurationSystem/Client/CSAPI.py b/ConfigurationSystem/Client/CSAPI.py index 17dcc25dfdc..d7ff19ad20a 100644 --- a/ConfigurationSystem/Client/CSAPI.py +++ b/ConfigurationSystem/Client/CSAPI.py @@ -4,7 +4,7 @@ """ from DIRAC import gLogger, gConfig, S_OK, S_ERROR -from DIRAC.Core.DISET.RPCClient import RPCClient +from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient from DIRAC.Core.Utilities import List, Time from DIRAC.Core.Security.X509Chain import X509Chain from DIRAC.Core.Security import Locations @@ -83,7 +83,7 @@ def initialize(self): if not retVal['OK']: self.__initialized = S_ERROR("Master server is not known. Is everything initialized?") return self.__initialized - self.__rpcClient = RPCClient(gConfig.getValue("/DIRAC/Configuration/MasterServer", "")) + self.__rpcClient = ConfigurationServerClient(url=gConfig.getValue("/DIRAC/Configuration/MasterServer", "")) self.__csMod = Modificator(self.__rpcClient, "%s - %s - %s" % (self.__userGroup, self.__userDN, Time.dateTime().strftime("%Y-%m-%d %H:%M:%S"))) retVal = self.downloadCSData() diff --git a/ConfigurationSystem/Client/ConfigurationServerClient.py b/ConfigurationSystem/Client/ConfigurationServerClient.py new file mode 100644 index 00000000000..aeb3efd245c --- /dev/null +++ b/ConfigurationSystem/Client/ConfigurationServerClient.py @@ -0,0 +1,16 @@ +""" + Custom Client for Configuration System +""" + +from DIRAC.Core.Base.Client import Client + + +class ConfigurationServerClient(Client): + """ + Placeholder client to speak with ConfigurationServer. + """ + + def __init__(self, **kwargs): + if 'url' not in kwargs: + kwargs['url'] = 'Configuration/Server' + super(ConfigurationServerClient, self).__init__(**kwargs) diff --git a/ConfigurationSystem/private/Refresher.py b/ConfigurationSystem/private/Refresher.py index b6b4e3ecb00..8a99e0f81c6 100755 --- a/ConfigurationSystem/private/Refresher.py +++ b/ConfigurationSystem/private/Refresher.py @@ -1,5 +1,4 @@ """ Refresh local CS (if needed) - In every gConfig """ __RCSID__ = "$Id$" @@ -31,111 +30,93 @@ def _updateFromRemoteLocation(serviceClient): return retVal -class Refresher(threading.Thread): +class RefresherBase(): + """ + Code factorisation for the refresher + """ def __init__(self): - threading.Thread.__init__(self) - self.__automaticUpdate = False - self.__lastUpdateTime = 0 - self.__url = False - self.__refreshEnabled = True - self.__timeout = 60 - self.__callbacks = {'newVersion': []} - gEventDispatcher.registerEvent("CSNewVersion") + self._automaticUpdate = False + self._lastUpdateTime = 0 + self._url = False + self._refreshEnabled = True + self._timeout = 60 + self._callbacks = {'newVersion': []} random.seed() - self.__triggeredRefreshLock = LockRing.LockRing().getLock() + gEventDispatcher.registerEvent("CSNewVersion") def disable(self): - self.__refreshEnabled = False + """ + Disable the refresher and prevent any request to another server + """ + self._refreshEnabled = False def enable(self): - self.__refreshEnabled = True - if self.__lastRefreshExpired(): + """ + Enable the refresher and authorize request to another server + WARNING: It will not activate automatic updates, use autoRefreshAndPublish() for that + """ + self._refreshEnabled = True + if self._lastRefreshExpired(): return self.forceRefresh() return S_OK() def isEnabled(self): - return self.__refreshEnabled + """ + Returns if you can use refresher or not, use automaticUpdateEnabled() to know + if refresh is automatic. + """ + return self._refreshEnabled def addListenerToNewVersionEvent(self, functor): gEventDispatcher.addListener("CSNewVersion", functor) - def __refreshInThread(self): - retVal = self.__refresh() - if not retVal['OK']: - gLogger.error("Error while updating the configuration", retVal['Message']) - - def __lastRefreshExpired(self): - return time.time() - self.__lastUpdateTime >= gConfigurationData.getRefreshTime() - - def refreshConfigurationIfNeeded(self): - if not self.__refreshEnabled or self.__automaticUpdate or not gConfigurationData.getServers(): - return - self.__triggeredRefreshLock.acquire() - try: - if not self.__lastRefreshExpired(): - return - self.__lastUpdateTime = time.time() - finally: - try: - self.__triggeredRefreshLock.release() - except thread.error: - pass - # Launch the refresh - thd = threading.Thread(target=self.__refreshInThread) - thd.setDaemon(1) - thd.start() + def _lastRefreshExpired(self): + """ + Just returns if last refresh must be considered as expired or not + """ + return time.time() - self._lastUpdateTime >= gConfigurationData.getRefreshTime() def forceRefresh(self, fromMaster=False): - if self.__refreshEnabled: - return self.__refresh(fromMaster=fromMaster) + """ + Force refresh + WARNING: If refresher is disabled, force a refresh will do nothing + """ + if self._refreshEnabled: + return self._refresh(fromMaster=fromMaster) return S_OK() - def autoRefreshAndPublish(self, sURL): - gLogger.debug("Setting configuration refresh as automatic") - if not gConfigurationData.getAutoPublish(): - gLogger.debug("Slave server won't auto publish itself") - if not gConfigurationData.getName(): - import DIRAC - DIRAC.abort(10, "Missing configuration name!") - self.__url = sURL - self.__automaticUpdate = True - self.setDaemon(1) - self.start() - - def run(self): - while self.__automaticUpdate: - iWaitTime = gConfigurationData.getPropagationTime() - time.sleep(iWaitTime) - if self.__refreshEnabled: - if not self.__refreshAndPublish(): - gLogger.error("Can't refresh configuration from any source") - - def __refreshAndPublish(self): - self.__lastUpdateTime = time.time() + def _refreshAndPublish(self): + """ + Refresh configuration and publish local updates + """ + self._lastUpdateTime = time.time() gLogger.info("Refreshing from master server") - from DIRAC.Core.DISET.RPCClient import RPCClient sMasterServer = gConfigurationData.getMasterServer() if sMasterServer: - oClient = RPCClient(sMasterServer, timeout=self.__timeout, - useCertificates=gConfigurationData.useServerCertificate(), - skipCACheck=gConfigurationData.skipCACheck()) + from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient + oClient = ConfigurationServerClient(url=sMasterServer, timeout=self._timeout, + useCertificates=gConfigurationData.useServerCertificate(), + skipCACheck=gConfigurationData.skipCACheck()) dRetVal = _updateFromRemoteLocation(oClient) if not dRetVal['OK']: gLogger.error("Can't update from master server", dRetVal['Message']) return False if gConfigurationData.getAutoPublish(): gLogger.info("Publishing to master server...") - dRetVal = oClient.publishSlaveServer(self.__url) + dRetVal = oClient.publishSlaveServer(self._url) if not dRetVal['OK']: gLogger.error("Can't publish to master server", dRetVal['Message']) return True else: gLogger.warn("No master server is specified in the configuration, trying to get data from other slaves") - return self.__refresh()['OK'] + return self._refresh()['OK'] - def __refresh(self, fromMaster=False): - self.__lastUpdateTime = time.time() + def _refresh(self, fromMaster=False): + """ + Refresh configuration + """ + self._lastUpdateTime = time.time() gLogger.debug("Refreshing configuration...") gatewayList = getGatewayURLs("Configuration/Server") updatingErrorsList = [] @@ -158,10 +139,10 @@ def __refresh(self, fromMaster=False): gLogger.debug("Randomized server list is %s" % ", ".join(randomServerList)) for sServer in randomServerList: - from DIRAC.Core.DISET.RPCClient import RPCClient - oClient = RPCClient(sServer, - useCertificates=gConfigurationData.useServerCertificate(), - skipCACheck=gConfigurationData.skipCACheck()) + from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient + oClient = ConfigurationServerClient(url=sServer, + useCertificates=gConfigurationData.useServerCertificate(), + skipCACheck=gConfigurationData.skipCACheck()) dRetVal = _updateFromRemoteLocation(oClient) if dRetVal['OK']: return dRetVal @@ -172,13 +153,89 @@ def __refresh(self, fromMaster=False): break return S_ERROR("Reason(s):\n\t%s" % "\n\t".join(List.uniqueElements(updatingErrorsList))) + +class Refresher(RefresherBase, threading.Thread): + """ + The refresher + A long time ago, in a code away, far away... + A guy do the code to autorefresh the configuration + To prepare transition to HTTPS we have done separation + between the logic and the implementation of background + tasks, it's the original version, for diset, using thread. + + """ + + def __init__(self): + threading.Thread.__init__(self) + RefresherBase.__init__(self) + self._triggeredRefreshLock = LockRing.LockRing().getLock() + + def _refreshInThread(self): + """ + Refreshing configration in the background. By default it use a thread but it can be + also runned in the IOLoop + """ + retVal = self._refresh() + if not retVal['OK']: + gLogger.error("Error while updating the configuration", retVal['Message']) + + def refreshConfigurationIfNeeded(self): + """ + Refresh the configuration if automatic update are disabled, refresher is enabled and servers are defined + """ + if not self._refreshEnabled or self._automaticUpdate or not gConfigurationData.getServers(): + return + self._triggeredRefreshLock.acquire() + try: + if not self._lastRefreshExpired(): + return + self._lastUpdateTime = time.time() + finally: + try: + self._triggeredRefreshLock.release() + except thread.error: + pass + # Launch the refreshf + thd = threading.Thread(target=self._refreshInThread) + thd.setDaemon(1) + thd.start() + + def autoRefreshAndPublish(self, sURL): + """ + Start the autorefresh background task + + :param str sURL: URL of the configuration server + """ + gLogger.debug("Setting configuration refresh as automatic") + if not gConfigurationData.getAutoPublish(): + gLogger.debug("Slave server won't auto publish itself") + if not gConfigurationData.getName(): + import DIRAC + DIRAC.abort(10, "Missing configuration name!") + self._url = sURL + self._automaticUpdate = True + self.setDaemon(1) + self.start() + + def run(self): + while self._automaticUpdate: + time.sleep(gConfigurationData.getPropagationTime()) + if self._refreshEnabled: + if not self._refreshAndPublish(): + gLogger.error("Can't refresh configuration from any source") + def daemonize(self): + """ + Daemonize the background tasks + """ + self.setDaemon(1) self.start() gRefresher = Refresher() + if __name__ == "__main__": time.sleep(0.1) gRefresher.daemonize() diff --git a/ConfigurationSystem/private/ServiceInterface.py b/ConfigurationSystem/private/ServiceInterface.py index 37f016c2f74..40b9c624ca2 100755 --- a/ConfigurationSystem/private/ServiceInterface.py +++ b/ConfigurationSystem/private/ServiceInterface.py @@ -1,265 +1,327 @@ -""" Threaded implementation of services +""" +Service interface is the service who provide config for client and synchronize Master/Slave servers """ -import os +__RCSID__ = "$Id$" import time -import re import threading +import os +import re import zipfile import zlib import DIRAC +from DIRAC import gLogger from DIRAC.Core.Utilities.File import mkDir from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData, ConfigurationData from DIRAC.ConfigurationSystem.private.Refresher import gRefresher -from DIRAC.FrameworkSystem.Client.Logger import gLogger from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR -from DIRAC.Core.DISET.RPCClient import RPCClient +from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient -__RCSID__ = "$Id$" +class ServiceInterfaceBase(object): + """Service interface is the service who provide config for client and synchronize Master/Slave servers""" -class ServiceInterface( threading.Thread ): - - def __init__( self, sURL ): - threading.Thread.__init__( self ) + def __init__(self, sURL): self.sURL = sURL - gLogger.info( "Initializing Configuration Service", "URL is %s" % sURL ) - self.__modificationsIgnoreMask = [ '/DIRAC/Configuration/Servers', '/DIRAC/Configuration/Version' ] + gLogger.info("Initializing Configuration Service", "URL is %s" % sURL) + self.__modificationsIgnoreMask = ['/DIRAC/Configuration/Servers', '/DIRAC/Configuration/Version'] gConfigurationData.setAsService() if not gConfigurationData.isMaster(): - gLogger.info( "Starting configuration service as slave" ) - gRefresher.autoRefreshAndPublish( self.sURL ) + gLogger.info("Starting configuration service as slave") + gRefresher.autoRefreshAndPublish(self.sURL) else: - gLogger.info( "Starting configuration service as master" ) + gLogger.info("Starting configuration service as master") gRefresher.disable() self.__loadConfigurationData() self.dAliveSlaveServers = {} - self.__launchCheckSlaves() - def isMaster( self ): + def isMaster(self): + """ + Just returns if master server or not + """ return gConfigurationData.isMaster() - def __launchCheckSlaves( self ): - gLogger.info( "Starting purge slaves thread" ) - self.setDaemon( 1 ) - self.start() - - def __loadConfigurationData( self ): - mkDir( os.path.join( DIRAC.rootPath, "etc", "csbackup" ) ) + def __loadConfigurationData(self): + """ + As the name said, it just load the configuration + """ + mkDir(os.path.join(DIRAC.rootPath, "etc", "csbackup")) gConfigurationData.loadConfigurationData() if gConfigurationData.isMaster(): bBuiltNewConfiguration = False if not gConfigurationData.getName(): - DIRAC.abort( 10, "Missing name for the configuration to be exported!" ) + DIRAC.abort(10, "Missing name for the configuration to be exported!") gConfigurationData.exportName() sVersion = gConfigurationData.getVersion() if sVersion == "0": - gLogger.info( "There's no version. Generating a new one" ) + gLogger.info("There's no version. Generating a new one") gConfigurationData.generateNewVersion() bBuiltNewConfiguration = True if self.sURL not in gConfigurationData.getServers(): - gConfigurationData.setServers( self.sURL ) + gConfigurationData.setServers(self.sURL) bBuiltNewConfiguration = True - gConfigurationData.setMasterServer( self.sURL ) + gConfigurationData.setMasterServer(self.sURL) if bBuiltNewConfiguration: gConfigurationData.writeRemoteConfigurationToDisk() - def __generateNewVersion( self ): + def __generateNewVersion(self): + """ + After changing configuration, we use this method to save them + """ if gConfigurationData.isMaster(): gConfigurationData.generateNewVersion() gConfigurationData.writeRemoteConfigurationToDisk() - def publishSlaveServer( self, sSlaveURL ): + def publishSlaveServer(self, sSlaveURL): + """ + Called by the slave server via service, it register a new slave server + + :param sSlaveURL: url of slave server + """ if not gConfigurationData.isMaster(): - return S_ERROR( "Configuration modification is not allowed in this server" ) - gLogger.info( "Pinging slave %s" % sSlaveURL ) - rpcClient = RPCClient( sSlaveURL, timeout = 10, useCertificates = True ) + return S_ERROR("Configuration modification is not allowed in this server") + + gLogger.info("Pinging slave %s" % sSlaveURL) + + rpcClient = ConfigurationServerClient(url=sSlaveURL, timeout=10, useCertificates=True) retVal = rpcClient.ping() - if not retVal[ 'OK' ]: - gLogger.info( "Slave %s didn't reply" % sSlaveURL ) + if not retVal['OK']: + gLogger.info("Slave %s didn't reply" % sSlaveURL) return - if retVal[ 'Value' ][ 'name' ] != 'Configuration/Server': - gLogger.info( "Slave %s is not a CS serveR" % sSlaveURL ) + if retVal['Value']['name'] != 'Configuration/Server': + gLogger.info("Slave %s is not a CS serveR" % sSlaveURL) return bNewSlave = False - if not sSlaveURL in self.dAliveSlaveServers.keys(): + if sSlaveURL not in self.dAliveSlaveServers: bNewSlave = True - gLogger.info( "New slave registered", sSlaveURL ) - self.dAliveSlaveServers[ sSlaveURL ] = time.time() + gLogger.info("New slave registered", sSlaveURL) + self.dAliveSlaveServers[sSlaveURL] = time.time() if bNewSlave: - gConfigurationData.setServers( "%s, %s" % ( self.sURL, - ", ".join( self.dAliveSlaveServers.keys() ) ) ) + gConfigurationData.setServers("%s, %s" % (self.sURL, + ", ".join(self.dAliveSlaveServers.keys()))) self.__generateNewVersion() - def __checkSlavesStatus( self, forceWriteConfiguration = False ): - gLogger.info( "Checking status of slave servers" ) + def _checkSlavesStatus(self, forceWriteConfiguration=False): + """ + Check if Slaves server are still availlable + + :param forceWriteConfiguration=False: Force rewriting configuration after checking slaves + """ + gLogger.info("Checking status of slave servers") iGraceTime = gConfigurationData.getSlavesGraceTime() lSlaveURLs = self.dAliveSlaveServers.keys() bModifiedSlaveServers = False for sSlaveURL in lSlaveURLs: - if time.time() - self.dAliveSlaveServers[ sSlaveURL ] > iGraceTime: - gLogger.info( "Found dead slave", sSlaveURL ) - del self.dAliveSlaveServers[ sSlaveURL ] + if time.time() - self.dAliveSlaveServers[sSlaveURL] > iGraceTime: + gLogger.info("Found dead slave", sSlaveURL) + del self.dAliveSlaveServers[sSlaveURL] bModifiedSlaveServers = True if bModifiedSlaveServers or forceWriteConfiguration: - gConfigurationData.setServers( "%s, %s" % ( self.sURL, - ", ".join( self.dAliveSlaveServers.keys() ) ) ) + gConfigurationData.setServers("%s, %s" % (self.sURL, + ", ".join(self.dAliveSlaveServers.keys()))) self.__generateNewVersion() - def getCompressedConfiguration( self ): - sData = gConfigurationData.getCompressedData() - - def updateConfiguration( self, sBuffer, commiter = "", updateVersionOption = False ): + def updateConfiguration(self, sBuffer, commiter="", updateVersionOption=False): + """ + Update the configuration + """ if not gConfigurationData.isMaster(): - return S_ERROR( "Configuration modification is not allowed in this server" ) - #Load the data in a ConfigurationData object - oRemoteConfData = ConfigurationData( False ) - oRemoteConfData.loadRemoteCFGFromCompressedMem( sBuffer ) + return S_ERROR("Configuration modification is not allowed in this server") + # Load the data in a ConfigurationData object + oRemoteConfData = ConfigurationData(False) + oRemoteConfData.loadRemoteCFGFromCompressedMem(sBuffer) if updateVersionOption: - oRemoteConfData.setVersion( gConfigurationData.getVersion() ) - #Test that remote and new versions are the same + oRemoteConfData.setVersion(gConfigurationData.getVersion()) + # Test that remote and new versions are the same sRemoteVersion = oRemoteConfData.getVersion() sLocalVersion = gConfigurationData.getVersion() - gLogger.info( "Checking versions\nremote: %s\nlocal: %s" % ( sRemoteVersion, sLocalVersion ) ) + gLogger.info("Checking versions\nremote: %s\nlocal: %s" % (sRemoteVersion, sLocalVersion)) if sRemoteVersion != sLocalVersion: if not gConfigurationData.mergingEnabled(): - return S_ERROR( "Local and remote versions differ (%s vs %s). Cannot commit." % ( sLocalVersion, sRemoteVersion ) ) + return S_ERROR("Local and remote versions differ (%s vs %s). Cannot commit." % (sLocalVersion, sRemoteVersion)) else: - gLogger.info( "AutoMerging new data!" ) + gLogger.info("AutoMerging new data!") if updateVersionOption: - return S_ERROR( "Cannot AutoMerge! version was overwritten" ) - result = self.__mergeIndependentUpdates( oRemoteConfData ) - if not result[ 'OK' ]: - gLogger.warn( "Could not AutoMerge!", result[ 'Message' ] ) - return S_ERROR( "AutoMerge failed: %s" % result[ 'Message' ] ) - requestedRemoteCFG = result[ 'Value' ] - gLogger.info( "AutoMerge successful!" ) - oRemoteConfData.setRemoteCFG( requestedRemoteCFG ) - #Test that configuration names are the same + return S_ERROR("Cannot AutoMerge! version was overwritten") + result = self.__mergeIndependentUpdates(oRemoteConfData) + if not result['OK']: + gLogger.warn("Could not AutoMerge!", result['Message']) + return S_ERROR("AutoMerge failed: %s" % result['Message']) + requestedRemoteCFG = result['Value'] + gLogger.info("AutoMerge successful!") + oRemoteConfData.setRemoteCFG(requestedRemoteCFG) + # Test that configuration names are the same sRemoteName = oRemoteConfData.getName() sLocalName = gConfigurationData.getName() if sRemoteName != sLocalName: - return S_ERROR( "Names differ: Server is %s and remote is %s" % ( sLocalName, sRemoteName ) ) - #Update and generate a new version - gLogger.info( "Committing new data..." ) + return S_ERROR("Names differ: Server is %s and remote is %s" % (sLocalName, sRemoteName)) + # Update and generate a new version + gLogger.info("Committing new data...") gConfigurationData.lock() - gLogger.info( "Setting the new CFG" ) - gConfigurationData.setRemoteCFG( oRemoteConfData.getRemoteCFG() ) + gLogger.info("Setting the new CFG") + gConfigurationData.setRemoteCFG(oRemoteConfData.getRemoteCFG()) gConfigurationData.unlock() - gLogger.info( "Generating new version" ) + gLogger.info("Generating new version") gConfigurationData.generateNewVersion() - #self.__checkSlavesStatus( forceWriteConfiguration = True ) - gLogger.info( "Writing new version to disk!" ) - retVal = gConfigurationData.writeRemoteConfigurationToDisk( "%s@%s" % ( commiter, gConfigurationData.getVersion() ) ) - gLogger.info( "New version it is!" ) + # self._checkSlavesStatus( forceWriteConfiguration = True ) + gLogger.info("Writing new version to disk!") + retVal = gConfigurationData.writeRemoteConfigurationToDisk("%s@%s" % (commiter, gConfigurationData.getVersion())) + gLogger.info("New version it is!") return retVal - def getCompressedConfigurationData( self ): + def getCompressedConfigurationData(self): + """ + Get the configuration + """ return gConfigurationData.getCompressedData() - def getVersion( self ): + def getVersion(self): + """ + Get the version + """ return gConfigurationData.getVersion() - def getCommitHistory( self ): - files = self.__getCfgBackups( gConfigurationData.getBackupDir() ) - backups = [ ".".join( fileName.split( "." )[1:-1] ).split( "@" ) for fileName in files ] + def getCommitHistory(self): + """ + Get the history + """ + files = self.__getCfgBackups(gConfigurationData.getBackupDir()) + backups = [".".join(fileName.split(".")[1:-1]).split("@") for fileName in files] return backups - def run( self ): - while True: - iWaitTime = gConfigurationData.getSlavesGraceTime() - time.sleep( iWaitTime ) - self.__checkSlavesStatus() - - def getVersionContents( self, date ): + def getVersionContents(self, date): + """ + Get an old version + """ backupDir = gConfigurationData.getBackupDir() - files = self.__getCfgBackups( backupDir, date ) + files = self.__getCfgBackups(backupDir, date) for fileName in files: - with zipfile.ZipFile( "%s/%s" % ( backupDir, fileName ), "r" ) as zFile: + with zipfile.ZipFile("%s/%s" % (backupDir, fileName), "r") as zFile: cfgName = zFile.namelist()[0] - retVal = S_OK( zlib.compress( zFile.read( cfgName ) , 9 ) ) + retVal = S_OK(zlib.compress(zFile.read(cfgName), 9)) return retVal - return S_ERROR( "Version %s does not exist" % date ) + return S_ERROR("Version %s does not exist" % date) - def __getCfgBackups( self, basePath, date = "", subPath = "" ): - rs = re.compile( r"^%s\..*%s.*\.zip$" % ( gConfigurationData.getName(), date ) ) - fsEntries = os.listdir( "%s/%s" % ( basePath, subPath ) ) - fsEntries.sort( reverse = True ) + def __getCfgBackups(self, basePath, date="", subPath=""): + """ + Get list of backup file + """ + rs = re.compile(r"^%s\..*%s.*\.zip$" % (gConfigurationData.getName(), date)) + fsEntries = os.listdir("%s/%s" % (basePath, subPath)) + fsEntries.sort(reverse=True) backupsList = [] for entry in fsEntries: - entryPath = "%s/%s/%s" % ( basePath, subPath, entry ) - if os.path.isdir( entryPath ): - backupsList.extend( self.__getCfgBackups( basePath, date, "%s/%s" % ( subPath, entry ) ) ) - elif os.path.isfile( entryPath ): - if rs.search( entry ): - backupsList.append( "%s/%s" % ( subPath, entry ) ) + entryPath = "%s/%s/%s" % (basePath, subPath, entry) + if os.path.isdir(entryPath): + backupsList.extend(self.__getCfgBackups(basePath, date, "%s/%s" % (subPath, entry))) + elif os.path.isfile(entryPath): + if rs.search(entry): + backupsList.append("%s/%s" % (subPath, entry)) return backupsList - def __getPreviousCFG( self, oRemoteConfData ): - remoteExpectedVersion = oRemoteConfData.getVersion() - backupsList = self.__getCfgBackups( gConfigurationData.getBackupDir(), date = oRemoteConfData.getVersion() ) + def __getPreviousCFG(self, oRemoteConfData): + """ + Get last configurtion + """ + # remoteExpectedVersion = oRemoteConfData.getVersion() + backupsList = self.__getCfgBackups(gConfigurationData.getBackupDir(), date=oRemoteConfData.getVersion()) if not backupsList: - return S_ERROR( "Could not AutoMerge. Could not retrieve original commiter's version" ) + return S_ERROR("Could not AutoMerge. Could not retrieve original commiter's version") prevRemoteConfData = ConfigurationData() backFile = backupsList[0] if backFile[0] == "/": - backFile = os.path.join( gConfigurationData.getBackupDir(), backFile[1:] ) + backFile = os.path.join(gConfigurationData.getBackupDir(), backFile[1:]) try: - prevRemoteConfData.loadConfigurationData( backFile ) + prevRemoteConfData.loadConfigurationData(backFile) except Exception as e: - return S_ERROR( "Could not load original commiter's version: %s" % str( e ) ) - gLogger.info( "Loaded client original version %s" % prevRemoteConfData.getVersion() ) - return S_OK( prevRemoteConfData.getRemoteCFG() ) + return S_ERROR("Could not load original commiter's version: %s" % str(e)) + gLogger.info("Loaded client original version %s" % prevRemoteConfData.getVersion()) + return S_OK(prevRemoteConfData.getRemoteCFG()) - def _checkConflictsInModifications( self, realModList, reqModList, parentSection = "" ): - realModifiedSections = dict( [ ( modAc[1], modAc[3] ) for modAc in realModList if modAc[0].find( 'Sec' ) == len( modAc[0] ) - 3 ] ) - reqOptionsModificationList = dict( [ ( modAc[1], modAc[3] ) for modAc in reqModList if modAc[0].find( 'Opt' ) == len( modAc[0] ) - 3 ] ) - optionModRequests = 0 + def _checkConflictsInModifications(self, realModList, reqModList, parentSection=""): + """ + Check for conflicts + """ + realModifiedSections = dict([(modAc[1], modAc[3]) + for modAc in realModList if modAc[0].find('Sec') == len(modAc[0]) - 3]) + # reqOptionsModificationList = dict([(modAc[1], modAc[3]) + # for modAc in reqModList if modAc[0].find('Opt') == len(modAc[0]) - 3]) + # optionModRequests = 0 for modAc in reqModList: action = modAc[0] objectName = modAc[1] if action == "addSec": if objectName in realModifiedSections: - return S_ERROR( "Section %s/%s already exists" % ( parentSection, objectName ) ) + return S_ERROR("Section %s/%s already exists" % (parentSection, objectName)) elif action == "delSec": if objectName in realModifiedSections: - return S_ERROR( "Section %s/%s cannot be deleted. It has been modified." % ( parentSection, objectName ) ) + return S_ERROR("Section %s/%s cannot be deleted. It has been modified." % (parentSection, objectName)) elif action == "modSec": if objectName in realModifiedSections: - result = self._checkConflictsInModifications( realModifiedSections[ objectName ], - modAc[3], "%s/%s" % ( parentSection, objectName ) ) - if not result[ 'OK' ]: + result = self._checkConflictsInModifications(realModifiedSections[objectName], + modAc[3], "%s/%s" % (parentSection, objectName)) + if not result['OK']: return result for modAc in realModList: action = modAc[0] objectName = modAc[1] - if action.find( "Opt" ) == len( action ) - 3: - return S_ERROR( "Section %s cannot be merged. Option %s/%s has been modified" % ( parentSection, parentSection, objectName ) ) + if action.find("Opt") == len(action) - 3: + return S_ERROR( + "Section %s cannot be merged. Option %s/%s has been modified" % + (parentSection, parentSection, objectName)) return S_OK() - def __mergeIndependentUpdates( self, oRemoteConfData ): - #return S_ERROR( "AutoMerge is still not finished. Meanwhile... why don't you get the newest conf and update from there?" ) - #Get all the CFGs + def __mergeIndependentUpdates(self, oRemoteConfData): + """ + Merge configuration + """ + # Get all the CFGs curSrvCFG = gConfigurationData.getRemoteCFG().clone() curCliCFG = oRemoteConfData.getRemoteCFG().clone() - result = self.__getPreviousCFG( oRemoteConfData ) - if not result[ 'OK' ]: + result = self.__getPreviousCFG(oRemoteConfData) + if not result['OK']: return result - prevCliCFG = result[ 'Value' ] - #Try to merge curCli with curSrv. To do so we check the updates from + prevCliCFG = result['Value'] + # Try to merge curCli with curSrv. To do so we check the updates from # prevCli -> curSrv VS prevCli -> curCli - prevCliToCurCliModList = prevCliCFG.getModifications( curCliCFG ) - prevCliToCurSrvModList = prevCliCFG.getModifications( curSrvCFG ) - result = self._checkConflictsInModifications( prevCliToCurSrvModList, - prevCliToCurCliModList ) - if not result[ 'OK' ]: - return S_ERROR( "Cannot AutoMerge: %s" % result[ 'Message' ] ) - #Merge! - result = curSrvCFG.applyModifications( prevCliToCurCliModList ) - if not result[ 'OK' ]: + prevCliToCurCliModList = prevCliCFG.getModifications(curCliCFG) + prevCliToCurSrvModList = prevCliCFG.getModifications(curSrvCFG) + result = self._checkConflictsInModifications(prevCliToCurSrvModList, + prevCliToCurCliModList) + if not result['OK']: + return S_ERROR("Cannot AutoMerge: %s" % result['Message']) + # Merge! + result = curSrvCFG.applyModifications(prevCliToCurCliModList) + if not result['OK']: return result - return S_OK( curSrvCFG ) + return S_OK(curSrvCFG) + + +class ServiceInterface(ServiceInterfaceBase, threading.Thread): + """ + Service interface, manage Slave/Master server for CS + Thread components + """ + + def __init__(self, sURL): + threading.Thread.__init__(self) + ServiceInterfaceBase.__init__(self, sURL) + self.__launchCheckSlaves() + + def __launchCheckSlaves(self): + """ + Start loop who check if slaves are alive + """ + gLogger.info("Starting purge slaves thread") + self.setDaemon(1) + self.start() + + def run(self): + while True: + iWaitTime = gConfigurationData.getSlavesGraceTime() + time.sleep(iWaitTime) + self._checkSlavesStatus() diff --git a/Core/Base/Client.py b/Core/Base/Client.py index 663294b70f0..52a25624d9a 100644 --- a/Core/Base/Client.py +++ b/Core/Base/Client.py @@ -7,7 +7,6 @@ from DIRAC.Core.DISET.RPCClient import RPCClient - class Client(object): """ Simple class to redirect unknown actions directly to the server. Arguments to the constructor are passed to the RPCClient constructor as they are. @@ -23,7 +22,7 @@ def __init__(self, **kwargs): :param kwargs: just stored as an attribute and passed when creating the RPCClient """ - self.serverURL = None + self.serverURL = kwargs.pop('url', None) self.call = None # I suppose it is initialized here to make pylint happy self.__kwargs = kwargs diff --git a/Core/DISET/RequestHandler.py b/Core/DISET/RequestHandler.py index 73d9ecc0c2b..3a0a21e0fdc 100755 --- a/Core/DISET/RequestHandler.py +++ b/Core/DISET/RequestHandler.py @@ -253,7 +253,7 @@ def __RPCCallFunction(self, method, args): try: # Get the method we are trying to call oMethod = getattr(self, realMethod) - except: + except BaseException: return S_ERROR("Unknown method %s" % method) # Check if the client sends correct arguments dRetVal = self.__checkExpectedArgumentTypes(method, args) @@ -290,7 +290,7 @@ def __checkExpectedArgumentTypes(self, method, args): sListName = "types_%s" % method try: oTypesList = getattr(self, sListName) - except: + except BaseException: gLogger.error("There's no types info for method", "export_%s" % method) return S_ERROR("Handler error for server %s while processing method %s" % (self.serviceInfoDict['serviceName'], method)) @@ -361,7 +361,7 @@ def _rh_executeConnectionCallback(self, methodName, args=False): gLogger.debug("Callback to %s" % realMethod) try: oMethod = getattr(self, realMethod) - except: + except BaseException: # No callback defined by handler return S_OK() try: @@ -383,7 +383,7 @@ def _rh_executeMessageCallback(self, msgObj): startTime = time.time() try: oMethod = getattr(self, methodName) - except: + except BaseException: return S_ERROR("Handler function for message %s does not exist!" % msgName) self.__lockManager.lock(methodName) try: @@ -466,7 +466,7 @@ def export_ping(self): with open("/proc/uptime") as oFD: iUptime = long(float(oFD.readline().split()[0].strip())) dInfo['host uptime'] = iUptime - except: + except BaseException: pass startTime = self.serviceInfoDict['serviceStartTime'] dInfo['service start time'] = self.serviceInfoDict['serviceStartTime'] @@ -477,7 +477,7 @@ def export_ping(self): with open("/proc/loadavg") as oFD: sLine = oFD.readline() dInfo['load'] = " ".join(sLine.split()[:3]) - except: + except BaseException: pass dInfo['name'] = self.serviceInfoDict['serviceName'] stTimes = os.times() @@ -490,6 +490,18 @@ def export_ping(self): return S_OK(dInfo) + types_whoami = [] + auth_whoami = ['all'] + + def export_whoami(self): + """ + A simple whoami, returns all credential dictionnary, except certificate chain object. + """ + credDict = self.srv_getRemoteCredentials() + # Remove the certificate chain (client already have it) + credDict.pop('x509Chain', None) + return S_OK(credDict) + types_echo = [basestring] @staticmethod