From fe2f8df3741f9fb37db24e03d98ff2d382a2a02f Mon Sep 17 00:00:00 2001 From: louisjdmartin Date: Thu, 16 Aug 2018 15:10:44 +0200 Subject: [PATCH 1/7] Core: use url in Base.Client constructor CS: add specific client and use it in CSAPI --- ConfigurationSystem/Client/CSAPI.py | 4 ++-- .../Client/ConfigurationServerClient.py | 13 +++++++++++++ Core/Base/Client.py | 3 +-- 3 files changed, 16 insertions(+), 4 deletions(-) create mode 100644 ConfigurationSystem/Client/ConfigurationServerClient.py diff --git a/ConfigurationSystem/Client/CSAPI.py b/ConfigurationSystem/Client/CSAPI.py index 17dcc25dfdc..1902be16299 100644 --- a/ConfigurationSystem/Client/CSAPI.py +++ b/ConfigurationSystem/Client/CSAPI.py @@ -4,7 +4,7 @@ """ from DIRAC import gLogger, gConfig, S_OK, S_ERROR -from DIRAC.Core.DISET.RPCClient import RPCClient +from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient from DIRAC.Core.Utilities import List, Time from DIRAC.Core.Security.X509Chain import X509Chain from DIRAC.Core.Security import Locations @@ -83,7 +83,7 @@ def initialize(self): if not retVal['OK']: self.__initialized = S_ERROR("Master server is not known. Is everything initialized?") return self.__initialized - self.__rpcClient = RPCClient(gConfig.getValue("/DIRAC/Configuration/MasterServer", "")) + self.__rpcClient = ConfigurationServerClient(url=gConfig.getValue( "/DIRAC/Configuration/MasterServer", "" )) self.__csMod = Modificator(self.__rpcClient, "%s - %s - %s" % (self.__userGroup, self.__userDN, Time.dateTime().strftime("%Y-%m-%d %H:%M:%S"))) retVal = self.downloadCSData() diff --git a/ConfigurationSystem/Client/ConfigurationServerClient.py b/ConfigurationSystem/Client/ConfigurationServerClient.py new file mode 100644 index 00000000000..cd834275b6b --- /dev/null +++ b/ConfigurationSystem/Client/ConfigurationServerClient.py @@ -0,0 +1,13 @@ +""" + Custom Client for Configuration System +""" + +from DIRAC.Core.Base.Client import Client + + +class ConfigurationServerClient(Client): + """ + Placeholder client to speak with ConfigurationServer. + """ + + pass diff --git a/Core/Base/Client.py b/Core/Base/Client.py index 663294b70f0..52a25624d9a 100644 --- a/Core/Base/Client.py +++ b/Core/Base/Client.py @@ -7,7 +7,6 @@ from DIRAC.Core.DISET.RPCClient import RPCClient - class Client(object): """ Simple class to redirect unknown actions directly to the server. Arguments to the constructor are passed to the RPCClient constructor as they are. @@ -23,7 +22,7 @@ def __init__(self, **kwargs): :param kwargs: just stored as an attribute and passed when creating the RPCClient """ - self.serverURL = None + self.serverURL = kwargs.pop('url', None) self.call = None # I suppose it is initialized here to make pylint happy self.__kwargs = kwargs From c5da48d59e79c087fc4d46bee797cbdcd3e12aca Mon Sep 17 00:00:00 2001 From: louisjdmartin Date: Thu, 16 Aug 2018 15:41:57 +0200 Subject: [PATCH 2/7] Prepare the refresher for HTTPS --- ConfigurationSystem/private/Refresher.py | 209 ++++++++++++++--------- 1 file changed, 133 insertions(+), 76 deletions(-) diff --git a/ConfigurationSystem/private/Refresher.py b/ConfigurationSystem/private/Refresher.py index b6b4e3ecb00..8a99e0f81c6 100755 --- a/ConfigurationSystem/private/Refresher.py +++ b/ConfigurationSystem/private/Refresher.py @@ -1,5 +1,4 @@ """ Refresh local CS (if needed) - In every gConfig """ __RCSID__ = "$Id$" @@ -31,111 +30,93 @@ def _updateFromRemoteLocation(serviceClient): return retVal -class Refresher(threading.Thread): +class RefresherBase(): + """ + Code factorisation for the refresher + """ def __init__(self): - threading.Thread.__init__(self) - self.__automaticUpdate = False - self.__lastUpdateTime = 0 - self.__url = False - self.__refreshEnabled = True - self.__timeout = 60 - self.__callbacks = {'newVersion': []} - gEventDispatcher.registerEvent("CSNewVersion") + self._automaticUpdate = False + self._lastUpdateTime = 0 + self._url = False + self._refreshEnabled = True + self._timeout = 60 + self._callbacks = {'newVersion': []} random.seed() - self.__triggeredRefreshLock = LockRing.LockRing().getLock() + gEventDispatcher.registerEvent("CSNewVersion") def disable(self): - self.__refreshEnabled = False + """ + Disable the refresher and prevent any request to another server + """ + self._refreshEnabled = False def enable(self): - self.__refreshEnabled = True - if self.__lastRefreshExpired(): + """ + Enable the refresher and authorize request to another server + WARNING: It will not activate automatic updates, use autoRefreshAndPublish() for that + """ + self._refreshEnabled = True + if self._lastRefreshExpired(): return self.forceRefresh() return S_OK() def isEnabled(self): - return self.__refreshEnabled + """ + Returns if you can use refresher or not, use automaticUpdateEnabled() to know + if refresh is automatic. + """ + return self._refreshEnabled def addListenerToNewVersionEvent(self, functor): gEventDispatcher.addListener("CSNewVersion", functor) - def __refreshInThread(self): - retVal = self.__refresh() - if not retVal['OK']: - gLogger.error("Error while updating the configuration", retVal['Message']) - - def __lastRefreshExpired(self): - return time.time() - self.__lastUpdateTime >= gConfigurationData.getRefreshTime() - - def refreshConfigurationIfNeeded(self): - if not self.__refreshEnabled or self.__automaticUpdate or not gConfigurationData.getServers(): - return - self.__triggeredRefreshLock.acquire() - try: - if not self.__lastRefreshExpired(): - return - self.__lastUpdateTime = time.time() - finally: - try: - self.__triggeredRefreshLock.release() - except thread.error: - pass - # Launch the refresh - thd = threading.Thread(target=self.__refreshInThread) - thd.setDaemon(1) - thd.start() + def _lastRefreshExpired(self): + """ + Just returns if last refresh must be considered as expired or not + """ + return time.time() - self._lastUpdateTime >= gConfigurationData.getRefreshTime() def forceRefresh(self, fromMaster=False): - if self.__refreshEnabled: - return self.__refresh(fromMaster=fromMaster) + """ + Force refresh + WARNING: If refresher is disabled, force a refresh will do nothing + """ + if self._refreshEnabled: + return self._refresh(fromMaster=fromMaster) return S_OK() - def autoRefreshAndPublish(self, sURL): - gLogger.debug("Setting configuration refresh as automatic") - if not gConfigurationData.getAutoPublish(): - gLogger.debug("Slave server won't auto publish itself") - if not gConfigurationData.getName(): - import DIRAC - DIRAC.abort(10, "Missing configuration name!") - self.__url = sURL - self.__automaticUpdate = True - self.setDaemon(1) - self.start() - - def run(self): - while self.__automaticUpdate: - iWaitTime = gConfigurationData.getPropagationTime() - time.sleep(iWaitTime) - if self.__refreshEnabled: - if not self.__refreshAndPublish(): - gLogger.error("Can't refresh configuration from any source") - - def __refreshAndPublish(self): - self.__lastUpdateTime = time.time() + def _refreshAndPublish(self): + """ + Refresh configuration and publish local updates + """ + self._lastUpdateTime = time.time() gLogger.info("Refreshing from master server") - from DIRAC.Core.DISET.RPCClient import RPCClient sMasterServer = gConfigurationData.getMasterServer() if sMasterServer: - oClient = RPCClient(sMasterServer, timeout=self.__timeout, - useCertificates=gConfigurationData.useServerCertificate(), - skipCACheck=gConfigurationData.skipCACheck()) + from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient + oClient = ConfigurationServerClient(url=sMasterServer, timeout=self._timeout, + useCertificates=gConfigurationData.useServerCertificate(), + skipCACheck=gConfigurationData.skipCACheck()) dRetVal = _updateFromRemoteLocation(oClient) if not dRetVal['OK']: gLogger.error("Can't update from master server", dRetVal['Message']) return False if gConfigurationData.getAutoPublish(): gLogger.info("Publishing to master server...") - dRetVal = oClient.publishSlaveServer(self.__url) + dRetVal = oClient.publishSlaveServer(self._url) if not dRetVal['OK']: gLogger.error("Can't publish to master server", dRetVal['Message']) return True else: gLogger.warn("No master server is specified in the configuration, trying to get data from other slaves") - return self.__refresh()['OK'] + return self._refresh()['OK'] - def __refresh(self, fromMaster=False): - self.__lastUpdateTime = time.time() + def _refresh(self, fromMaster=False): + """ + Refresh configuration + """ + self._lastUpdateTime = time.time() gLogger.debug("Refreshing configuration...") gatewayList = getGatewayURLs("Configuration/Server") updatingErrorsList = [] @@ -158,10 +139,10 @@ def __refresh(self, fromMaster=False): gLogger.debug("Randomized server list is %s" % ", ".join(randomServerList)) for sServer in randomServerList: - from DIRAC.Core.DISET.RPCClient import RPCClient - oClient = RPCClient(sServer, - useCertificates=gConfigurationData.useServerCertificate(), - skipCACheck=gConfigurationData.skipCACheck()) + from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient + oClient = ConfigurationServerClient(url=sServer, + useCertificates=gConfigurationData.useServerCertificate(), + skipCACheck=gConfigurationData.skipCACheck()) dRetVal = _updateFromRemoteLocation(oClient) if dRetVal['OK']: return dRetVal @@ -172,13 +153,89 @@ def __refresh(self, fromMaster=False): break return S_ERROR("Reason(s):\n\t%s" % "\n\t".join(List.uniqueElements(updatingErrorsList))) + +class Refresher(RefresherBase, threading.Thread): + """ + The refresher + A long time ago, in a code away, far away... + A guy do the code to autorefresh the configuration + To prepare transition to HTTPS we have done separation + between the logic and the implementation of background + tasks, it's the original version, for diset, using thread. + + """ + + def __init__(self): + threading.Thread.__init__(self) + RefresherBase.__init__(self) + self._triggeredRefreshLock = LockRing.LockRing().getLock() + + def _refreshInThread(self): + """ + Refreshing configration in the background. By default it use a thread but it can be + also runned in the IOLoop + """ + retVal = self._refresh() + if not retVal['OK']: + gLogger.error("Error while updating the configuration", retVal['Message']) + + def refreshConfigurationIfNeeded(self): + """ + Refresh the configuration if automatic update are disabled, refresher is enabled and servers are defined + """ + if not self._refreshEnabled or self._automaticUpdate or not gConfigurationData.getServers(): + return + self._triggeredRefreshLock.acquire() + try: + if not self._lastRefreshExpired(): + return + self._lastUpdateTime = time.time() + finally: + try: + self._triggeredRefreshLock.release() + except thread.error: + pass + # Launch the refreshf + thd = threading.Thread(target=self._refreshInThread) + thd.setDaemon(1) + thd.start() + + def autoRefreshAndPublish(self, sURL): + """ + Start the autorefresh background task + + :param str sURL: URL of the configuration server + """ + gLogger.debug("Setting configuration refresh as automatic") + if not gConfigurationData.getAutoPublish(): + gLogger.debug("Slave server won't auto publish itself") + if not gConfigurationData.getName(): + import DIRAC + DIRAC.abort(10, "Missing configuration name!") + self._url = sURL + self._automaticUpdate = True + self.setDaemon(1) + self.start() + + def run(self): + while self._automaticUpdate: + time.sleep(gConfigurationData.getPropagationTime()) + if self._refreshEnabled: + if not self._refreshAndPublish(): + gLogger.error("Can't refresh configuration from any source") + def daemonize(self): + """ + Daemonize the background tasks + """ + self.setDaemon(1) self.start() gRefresher = Refresher() + if __name__ == "__main__": time.sleep(0.1) gRefresher.daemonize() From 433fdbd316b27a74bcac89555a84dab35ad9fe01 Mon Sep 17 00:00:00 2001 From: louisjdmartin Date: Thu, 16 Aug 2018 15:45:48 +0200 Subject: [PATCH 3/7] Separating threads in Service Interface and remove RPCClient --- .../private/ServiceInterface.py | 272 ++-------------- .../private/ServiceInterfaceBase.py | 292 ++++++++++++++++++ 2 files changed, 314 insertions(+), 250 deletions(-) create mode 100644 ConfigurationSystem/private/ServiceInterfaceBase.py diff --git a/ConfigurationSystem/private/ServiceInterface.py b/ConfigurationSystem/private/ServiceInterface.py index 37f016c2f74..3ed1a132bad 100755 --- a/ConfigurationSystem/private/ServiceInterface.py +++ b/ConfigurationSystem/private/ServiceInterface.py @@ -1,265 +1,37 @@ -""" Threaded implementation of services +""" Threaded implementation of service interface """ -import os import time -import re import threading -import zipfile -import zlib +from DIRAC import gLogger -import DIRAC -from DIRAC.Core.Utilities.File import mkDir -from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData, ConfigurationData -from DIRAC.ConfigurationSystem.private.Refresher import gRefresher -from DIRAC.FrameworkSystem.Client.Logger import gLogger -from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR -from DIRAC.Core.DISET.RPCClient import RPCClient +from DIRAC.ConfigurationSystem.private.ServiceInterfaceBase import ServiceInterfaceBase +from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData __RCSID__ = "$Id$" -class ServiceInterface( threading.Thread ): +class ServiceInterface(ServiceInterfaceBase, threading.Thread): + """ + Service interface, manage Slave/Master server for CS + Thread components + """ - def __init__( self, sURL ): - threading.Thread.__init__( self ) - self.sURL = sURL - gLogger.info( "Initializing Configuration Service", "URL is %s" % sURL ) - self.__modificationsIgnoreMask = [ '/DIRAC/Configuration/Servers', '/DIRAC/Configuration/Version' ] - gConfigurationData.setAsService() - if not gConfigurationData.isMaster(): - gLogger.info( "Starting configuration service as slave" ) - gRefresher.autoRefreshAndPublish( self.sURL ) - else: - gLogger.info( "Starting configuration service as master" ) - gRefresher.disable() - self.__loadConfigurationData() - self.dAliveSlaveServers = {} - self.__launchCheckSlaves() + def __init__(self, sURL): + threading.Thread.__init__(self) + ServiceInterfaceBase.__init__(self, sURL) + self.__launchCheckSlaves() - def isMaster( self ): - return gConfigurationData.isMaster() - - def __launchCheckSlaves( self ): - gLogger.info( "Starting purge slaves thread" ) - self.setDaemon( 1 ) + def __launchCheckSlaves(self): + """ + Start loop who check if slaves are alive + """ + gLogger.info("Starting purge slaves thread") + self.setDaemon(1) self.start() - def __loadConfigurationData( self ): - mkDir( os.path.join( DIRAC.rootPath, "etc", "csbackup" ) ) - gConfigurationData.loadConfigurationData() - if gConfigurationData.isMaster(): - bBuiltNewConfiguration = False - if not gConfigurationData.getName(): - DIRAC.abort( 10, "Missing name for the configuration to be exported!" ) - gConfigurationData.exportName() - sVersion = gConfigurationData.getVersion() - if sVersion == "0": - gLogger.info( "There's no version. Generating a new one" ) - gConfigurationData.generateNewVersion() - bBuiltNewConfiguration = True - - if self.sURL not in gConfigurationData.getServers(): - gConfigurationData.setServers( self.sURL ) - bBuiltNewConfiguration = True - - gConfigurationData.setMasterServer( self.sURL ) - - if bBuiltNewConfiguration: - gConfigurationData.writeRemoteConfigurationToDisk() - - def __generateNewVersion( self ): - if gConfigurationData.isMaster(): - gConfigurationData.generateNewVersion() - gConfigurationData.writeRemoteConfigurationToDisk() - - def publishSlaveServer( self, sSlaveURL ): - if not gConfigurationData.isMaster(): - return S_ERROR( "Configuration modification is not allowed in this server" ) - gLogger.info( "Pinging slave %s" % sSlaveURL ) - rpcClient = RPCClient( sSlaveURL, timeout = 10, useCertificates = True ) - retVal = rpcClient.ping() - if not retVal[ 'OK' ]: - gLogger.info( "Slave %s didn't reply" % sSlaveURL ) - return - if retVal[ 'Value' ][ 'name' ] != 'Configuration/Server': - gLogger.info( "Slave %s is not a CS serveR" % sSlaveURL ) - return - bNewSlave = False - if not sSlaveURL in self.dAliveSlaveServers.keys(): - bNewSlave = True - gLogger.info( "New slave registered", sSlaveURL ) - self.dAliveSlaveServers[ sSlaveURL ] = time.time() - if bNewSlave: - gConfigurationData.setServers( "%s, %s" % ( self.sURL, - ", ".join( self.dAliveSlaveServers.keys() ) ) ) - self.__generateNewVersion() - - def __checkSlavesStatus( self, forceWriteConfiguration = False ): - gLogger.info( "Checking status of slave servers" ) - iGraceTime = gConfigurationData.getSlavesGraceTime() - lSlaveURLs = self.dAliveSlaveServers.keys() - bModifiedSlaveServers = False - for sSlaveURL in lSlaveURLs: - if time.time() - self.dAliveSlaveServers[ sSlaveURL ] > iGraceTime: - gLogger.info( "Found dead slave", sSlaveURL ) - del self.dAliveSlaveServers[ sSlaveURL ] - bModifiedSlaveServers = True - if bModifiedSlaveServers or forceWriteConfiguration: - gConfigurationData.setServers( "%s, %s" % ( self.sURL, - ", ".join( self.dAliveSlaveServers.keys() ) ) ) - self.__generateNewVersion() - - def getCompressedConfiguration( self ): - sData = gConfigurationData.getCompressedData() - - def updateConfiguration( self, sBuffer, commiter = "", updateVersionOption = False ): - if not gConfigurationData.isMaster(): - return S_ERROR( "Configuration modification is not allowed in this server" ) - #Load the data in a ConfigurationData object - oRemoteConfData = ConfigurationData( False ) - oRemoteConfData.loadRemoteCFGFromCompressedMem( sBuffer ) - if updateVersionOption: - oRemoteConfData.setVersion( gConfigurationData.getVersion() ) - #Test that remote and new versions are the same - sRemoteVersion = oRemoteConfData.getVersion() - sLocalVersion = gConfigurationData.getVersion() - gLogger.info( "Checking versions\nremote: %s\nlocal: %s" % ( sRemoteVersion, sLocalVersion ) ) - if sRemoteVersion != sLocalVersion: - if not gConfigurationData.mergingEnabled(): - return S_ERROR( "Local and remote versions differ (%s vs %s). Cannot commit." % ( sLocalVersion, sRemoteVersion ) ) - else: - gLogger.info( "AutoMerging new data!" ) - if updateVersionOption: - return S_ERROR( "Cannot AutoMerge! version was overwritten" ) - result = self.__mergeIndependentUpdates( oRemoteConfData ) - if not result[ 'OK' ]: - gLogger.warn( "Could not AutoMerge!", result[ 'Message' ] ) - return S_ERROR( "AutoMerge failed: %s" % result[ 'Message' ] ) - requestedRemoteCFG = result[ 'Value' ] - gLogger.info( "AutoMerge successful!" ) - oRemoteConfData.setRemoteCFG( requestedRemoteCFG ) - #Test that configuration names are the same - sRemoteName = oRemoteConfData.getName() - sLocalName = gConfigurationData.getName() - if sRemoteName != sLocalName: - return S_ERROR( "Names differ: Server is %s and remote is %s" % ( sLocalName, sRemoteName ) ) - #Update and generate a new version - gLogger.info( "Committing new data..." ) - gConfigurationData.lock() - gLogger.info( "Setting the new CFG" ) - gConfigurationData.setRemoteCFG( oRemoteConfData.getRemoteCFG() ) - gConfigurationData.unlock() - gLogger.info( "Generating new version" ) - gConfigurationData.generateNewVersion() - #self.__checkSlavesStatus( forceWriteConfiguration = True ) - gLogger.info( "Writing new version to disk!" ) - retVal = gConfigurationData.writeRemoteConfigurationToDisk( "%s@%s" % ( commiter, gConfigurationData.getVersion() ) ) - gLogger.info( "New version it is!" ) - return retVal - - def getCompressedConfigurationData( self ): - return gConfigurationData.getCompressedData() - - def getVersion( self ): - return gConfigurationData.getVersion() - - def getCommitHistory( self ): - files = self.__getCfgBackups( gConfigurationData.getBackupDir() ) - backups = [ ".".join( fileName.split( "." )[1:-1] ).split( "@" ) for fileName in files ] - return backups - - def run( self ): + def run(self): while True: iWaitTime = gConfigurationData.getSlavesGraceTime() - time.sleep( iWaitTime ) - self.__checkSlavesStatus() - - def getVersionContents( self, date ): - backupDir = gConfigurationData.getBackupDir() - files = self.__getCfgBackups( backupDir, date ) - for fileName in files: - with zipfile.ZipFile( "%s/%s" % ( backupDir, fileName ), "r" ) as zFile: - cfgName = zFile.namelist()[0] - retVal = S_OK( zlib.compress( zFile.read( cfgName ) , 9 ) ) - return retVal - return S_ERROR( "Version %s does not exist" % date ) - - def __getCfgBackups( self, basePath, date = "", subPath = "" ): - rs = re.compile( r"^%s\..*%s.*\.zip$" % ( gConfigurationData.getName(), date ) ) - fsEntries = os.listdir( "%s/%s" % ( basePath, subPath ) ) - fsEntries.sort( reverse = True ) - backupsList = [] - for entry in fsEntries: - entryPath = "%s/%s/%s" % ( basePath, subPath, entry ) - if os.path.isdir( entryPath ): - backupsList.extend( self.__getCfgBackups( basePath, date, "%s/%s" % ( subPath, entry ) ) ) - elif os.path.isfile( entryPath ): - if rs.search( entry ): - backupsList.append( "%s/%s" % ( subPath, entry ) ) - return backupsList - - def __getPreviousCFG( self, oRemoteConfData ): - remoteExpectedVersion = oRemoteConfData.getVersion() - backupsList = self.__getCfgBackups( gConfigurationData.getBackupDir(), date = oRemoteConfData.getVersion() ) - if not backupsList: - return S_ERROR( "Could not AutoMerge. Could not retrieve original commiter's version" ) - prevRemoteConfData = ConfigurationData() - backFile = backupsList[0] - if backFile[0] == "/": - backFile = os.path.join( gConfigurationData.getBackupDir(), backFile[1:] ) - try: - prevRemoteConfData.loadConfigurationData( backFile ) - except Exception as e: - return S_ERROR( "Could not load original commiter's version: %s" % str( e ) ) - gLogger.info( "Loaded client original version %s" % prevRemoteConfData.getVersion() ) - return S_OK( prevRemoteConfData.getRemoteCFG() ) - - def _checkConflictsInModifications( self, realModList, reqModList, parentSection = "" ): - realModifiedSections = dict( [ ( modAc[1], modAc[3] ) for modAc in realModList if modAc[0].find( 'Sec' ) == len( modAc[0] ) - 3 ] ) - reqOptionsModificationList = dict( [ ( modAc[1], modAc[3] ) for modAc in reqModList if modAc[0].find( 'Opt' ) == len( modAc[0] ) - 3 ] ) - optionModRequests = 0 - for modAc in reqModList: - action = modAc[0] - objectName = modAc[1] - if action == "addSec": - if objectName in realModifiedSections: - return S_ERROR( "Section %s/%s already exists" % ( parentSection, objectName ) ) - elif action == "delSec": - if objectName in realModifiedSections: - return S_ERROR( "Section %s/%s cannot be deleted. It has been modified." % ( parentSection, objectName ) ) - elif action == "modSec": - if objectName in realModifiedSections: - result = self._checkConflictsInModifications( realModifiedSections[ objectName ], - modAc[3], "%s/%s" % ( parentSection, objectName ) ) - if not result[ 'OK' ]: - return result - for modAc in realModList: - action = modAc[0] - objectName = modAc[1] - if action.find( "Opt" ) == len( action ) - 3: - return S_ERROR( "Section %s cannot be merged. Option %s/%s has been modified" % ( parentSection, parentSection, objectName ) ) - return S_OK() - - def __mergeIndependentUpdates( self, oRemoteConfData ): - #return S_ERROR( "AutoMerge is still not finished. Meanwhile... why don't you get the newest conf and update from there?" ) - #Get all the CFGs - curSrvCFG = gConfigurationData.getRemoteCFG().clone() - curCliCFG = oRemoteConfData.getRemoteCFG().clone() - result = self.__getPreviousCFG( oRemoteConfData ) - if not result[ 'OK' ]: - return result - prevCliCFG = result[ 'Value' ] - #Try to merge curCli with curSrv. To do so we check the updates from - # prevCli -> curSrv VS prevCli -> curCli - prevCliToCurCliModList = prevCliCFG.getModifications( curCliCFG ) - prevCliToCurSrvModList = prevCliCFG.getModifications( curSrvCFG ) - result = self._checkConflictsInModifications( prevCliToCurSrvModList, - prevCliToCurCliModList ) - if not result[ 'OK' ]: - return S_ERROR( "Cannot AutoMerge: %s" % result[ 'Message' ] ) - #Merge! - result = curSrvCFG.applyModifications( prevCliToCurCliModList ) - if not result[ 'OK' ]: - return result - return S_OK( curSrvCFG ) + time.sleep(iWaitTime) + self._checkSlavesStatus() diff --git a/ConfigurationSystem/private/ServiceInterfaceBase.py b/ConfigurationSystem/private/ServiceInterfaceBase.py new file mode 100644 index 00000000000..0085a34df66 --- /dev/null +++ b/ConfigurationSystem/private/ServiceInterfaceBase.py @@ -0,0 +1,292 @@ +"""Service interface is the service who provide config for client and synchronize Master/Slave servers""" + +import os +import time +import re +import zipfile +import zlib + +import DIRAC +from DIRAC.Core.Utilities.File import mkDir +from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData, ConfigurationData +from DIRAC.ConfigurationSystem.private.Refresher import gRefresher +from DIRAC.FrameworkSystem.Client.Logger import gLogger +from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR +from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient + + +class ServiceInterfaceBase(object): + """Service interface is the service who provide config for client and synchronize Master/Slave servers""" + + def __init__(self, sURL): + self.sURL = sURL + gLogger.info("Initializing Configuration Service", "URL is %s" % sURL) + self.__modificationsIgnoreMask = ['/DIRAC/Configuration/Servers', '/DIRAC/Configuration/Version'] + gConfigurationData.setAsService() + if not gConfigurationData.isMaster(): + gLogger.info("Starting configuration service as slave") + gRefresher.autoRefreshAndPublish(self.sURL) + else: + gLogger.info("Starting configuration service as master") + gRefresher.disable() + self.__loadConfigurationData() + self.dAliveSlaveServers = {} + + def __loadConfigurationData(self): + """ + As the name said, it just load the configuration + """ + mkDir(os.path.join(DIRAC.rootPath, "etc", "csbackup")) + gConfigurationData.loadConfigurationData() + if gConfigurationData.isMaster(): + bBuiltNewConfiguration = False + if not gConfigurationData.getName(): + DIRAC.abort(10, "Missing name for the configuration to be exported!") + gConfigurationData.exportName() + sVersion = gConfigurationData.getVersion() + if sVersion == "0": + gLogger.info("There's no version. Generating a new one") + gConfigurationData.generateNewVersion() + bBuiltNewConfiguration = True + + if self.sURL not in gConfigurationData.getServers(): + gConfigurationData.setServers(self.sURL) + bBuiltNewConfiguration = True + + gConfigurationData.setMasterServer(self.sURL) + + if bBuiltNewConfiguration: + gConfigurationData.writeRemoteConfigurationToDisk() + + def __generateNewVersion(self): + """ + After changing configuration, we use this method to save them + """ + if gConfigurationData.isMaster(): + gConfigurationData.generateNewVersion() + gConfigurationData.writeRemoteConfigurationToDisk() + + def publishSlaveServer(self, sSlaveURL): + """ + Called by the slave server via service, it register a new slave server + + :param sSlaveURL: url of slave server + """ + if not gConfigurationData.isMaster(): + return S_ERROR("Configuration modification is not allowed in this server") + + gLogger.info("Pinging slave %s" % sSlaveURL) + + rpcClient = ConfigurationServerClient(url=sSlaveURL, timeout=10, useCertificates=True) + retVal = rpcClient.ping() + if not retVal['OK']: + gLogger.info("Slave %s didn't reply" % sSlaveURL) + return + if retVal['Value']['name'] != 'Configuration/Server': + gLogger.info("Slave %s is not a CS serveR" % sSlaveURL) + return + bNewSlave = False + if sSlaveURL not in self.dAliveSlaveServers: + bNewSlave = True + gLogger.info("New slave registered", sSlaveURL) + self.dAliveSlaveServers[sSlaveURL] = time.time() + if bNewSlave: + gConfigurationData.setServers("%s, %s" % (self.sURL, + ", ".join(self.dAliveSlaveServers.keys()))) + self.__generateNewVersion() + + def _checkSlavesStatus(self, forceWriteConfiguration=False): + """ + Check if Slaves server are still availlable + + :param forceWriteConfiguration=False: Force rewriting configuration after checking slaves + """ + gLogger.info("Checking status of slave servers") + iGraceTime = gConfigurationData.getSlavesGraceTime() + lSlaveURLs = self.dAliveSlaveServers.keys() + bModifiedSlaveServers = False + for sSlaveURL in lSlaveURLs: + if time.time() - self.dAliveSlaveServers[sSlaveURL] > iGraceTime: + gLogger.info("Found dead slave", sSlaveURL) + del self.dAliveSlaveServers[sSlaveURL] + bModifiedSlaveServers = True + if bModifiedSlaveServers or forceWriteConfiguration: + gConfigurationData.setServers("%s, %s" % (self.sURL, + ", ".join(self.dAliveSlaveServers.keys()))) + self.__generateNewVersion() + + + def updateConfiguration(self, sBuffer, commiter="", updateVersionOption=False): + """ + Update the configuration + """ + if not gConfigurationData.isMaster(): + return S_ERROR("Configuration modification is not allowed in this server") + # Load the data in a ConfigurationData object + oRemoteConfData = ConfigurationData(False) + oRemoteConfData.loadRemoteCFGFromCompressedMem(sBuffer) + if updateVersionOption: + oRemoteConfData.setVersion(gConfigurationData.getVersion()) + # Test that remote and new versions are the same + sRemoteVersion = oRemoteConfData.getVersion() + sLocalVersion = gConfigurationData.getVersion() + gLogger.info("Checking versions\nremote: %s\nlocal: %s" % (sRemoteVersion, sLocalVersion)) + if sRemoteVersion != sLocalVersion: + if not gConfigurationData.mergingEnabled(): + return S_ERROR("Local and remote versions differ (%s vs %s). Cannot commit." % (sLocalVersion, sRemoteVersion)) + else: + gLogger.info("AutoMerging new data!") + if updateVersionOption: + return S_ERROR("Cannot AutoMerge! version was overwritten") + result = self.__mergeIndependentUpdates(oRemoteConfData) + if not result['OK']: + gLogger.warn("Could not AutoMerge!", result['Message']) + return S_ERROR("AutoMerge failed: %s" % result['Message']) + requestedRemoteCFG = result['Value'] + gLogger.info("AutoMerge successful!") + oRemoteConfData.setRemoteCFG(requestedRemoteCFG) + # Test that configuration names are the same + sRemoteName = oRemoteConfData.getName() + sLocalName = gConfigurationData.getName() + if sRemoteName != sLocalName: + return S_ERROR("Names differ: Server is %s and remote is %s" % (sLocalName, sRemoteName)) + # Update and generate a new version + gLogger.info("Committing new data...") + gConfigurationData.lock() + gLogger.info("Setting the new CFG") + gConfigurationData.setRemoteCFG(oRemoteConfData.getRemoteCFG()) + gConfigurationData.unlock() + gLogger.info("Generating new version") + gConfigurationData.generateNewVersion() + #self._checkSlavesStatus( forceWriteConfiguration = True ) + gLogger.info("Writing new version to disk!") + retVal = gConfigurationData.writeRemoteConfigurationToDisk("%s@%s" % (commiter, gConfigurationData.getVersion())) + gLogger.info("New version it is!") + return retVal + + def getCompressedConfigurationData(self): + """ + Get the configuration + """ + return gConfigurationData.getCompressedData() + + def getVersion(self): + """ + Get the version + """ + return gConfigurationData.getVersion() + + def getCommitHistory(self): + """ + Get the history + """ + files = self.__getCfgBackups(gConfigurationData.getBackupDir()) + backups = [".".join(fileName.split(".")[1:-1]).split("@") for fileName in files] + return backups + + def getVersionContents(self, date): + """ + Get an old version + """ + backupDir = gConfigurationData.getBackupDir() + files = self.__getCfgBackups(backupDir, date) + for fileName in files: + with zipfile.ZipFile("%s/%s" % (backupDir, fileName), "r") as zFile: + cfgName = zFile.namelist()[0] + retVal = S_OK(zlib.compress(zFile.read(cfgName), 9)) + return retVal + return S_ERROR("Version %s does not exist" % date) + + def __getCfgBackups(self, basePath, date="", subPath=""): + """ + Get list of backup file + """ + rs = re.compile(r"^%s\..*%s.*\.zip$" % (gConfigurationData.getName(), date)) + fsEntries = os.listdir("%s/%s" % (basePath, subPath)) + fsEntries.sort(reverse=True) + backupsList = [] + for entry in fsEntries: + entryPath = "%s/%s/%s" % (basePath, subPath, entry) + if os.path.isdir(entryPath): + backupsList.extend(self.__getCfgBackups(basePath, date, "%s/%s" % (subPath, entry))) + elif os.path.isfile(entryPath): + if rs.search(entry): + backupsList.append("%s/%s" % (subPath, entry)) + return backupsList + + def __getPreviousCFG(self, oRemoteConfData): + """ + Get last configurtion + """ + # remoteExpectedVersion = oRemoteConfData.getVersion() + backupsList = self.__getCfgBackups(gConfigurationData.getBackupDir(), date=oRemoteConfData.getVersion()) + if not backupsList: + return S_ERROR("Could not AutoMerge. Could not retrieve original commiter's version") + prevRemoteConfData = ConfigurationData() + backFile = backupsList[0] + if backFile[0] == "/": + backFile = os.path.join(gConfigurationData.getBackupDir(), backFile[1:]) + try: + prevRemoteConfData.loadConfigurationData(backFile) + except Exception as e: + return S_ERROR("Could not load original commiter's version: %s" % str(e)) + gLogger.info("Loaded client original version %s" % prevRemoteConfData.getVersion()) + return S_OK(prevRemoteConfData.getRemoteCFG()) + + def _checkConflictsInModifications(self, realModList, reqModList, parentSection=""): + """ + Check for conflicts + """ + realModifiedSections = dict([(modAc[1], modAc[3]) + for modAc in realModList if modAc[0].find('Sec') == len(modAc[0]) - 3]) + #reqOptionsModificationList = dict([(modAc[1], modAc[3]) + # for modAc in reqModList if modAc[0].find('Opt') == len(modAc[0]) - 3]) + #optionModRequests = 0 + for modAc in reqModList: + action = modAc[0] + objectName = modAc[1] + if action == "addSec": + if objectName in realModifiedSections: + return S_ERROR("Section %s/%s already exists" % (parentSection, objectName)) + elif action == "delSec": + if objectName in realModifiedSections: + return S_ERROR("Section %s/%s cannot be deleted. It has been modified." % (parentSection, objectName)) + elif action == "modSec": + if objectName in realModifiedSections: + result = self._checkConflictsInModifications(realModifiedSections[objectName], + modAc[3], "%s/%s" % (parentSection, objectName)) + if not result['OK']: + return result + for modAc in realModList: + action = modAc[0] + objectName = modAc[1] + if action.find("Opt") == len(action) - 3: + return S_ERROR( + "Section %s cannot be merged. Option %s/%s has been modified" % + (parentSection, parentSection, objectName)) + return S_OK() + + def __mergeIndependentUpdates(self, oRemoteConfData): + """ + Merge configuration + """ + # Get all the CFGs + curSrvCFG = gConfigurationData.getRemoteCFG().clone() + curCliCFG = oRemoteConfData.getRemoteCFG().clone() + result = self.__getPreviousCFG(oRemoteConfData) + if not result['OK']: + return result + prevCliCFG = result['Value'] + # Try to merge curCli with curSrv. To do so we check the updates from + # prevCli -> curSrv VS prevCli -> curCli + prevCliToCurCliModList = prevCliCFG.getModifications(curCliCFG) + prevCliToCurSrvModList = prevCliCFG.getModifications(curSrvCFG) + result = self._checkConflictsInModifications(prevCliToCurSrvModList, + prevCliToCurCliModList) + if not result['OK']: + return S_ERROR("Cannot AutoMerge: %s" % result['Message']) + # Merge! + result = curSrvCFG.applyModifications(prevCliToCurCliModList) + if not result['OK']: + return result + return S_OK(curSrvCFG) From bfa1be68fbe19d0e8331f1be232078b82d20e482 Mon Sep 17 00:00:00 2001 From: louisjdmartin Date: Thu, 16 Aug 2018 15:48:37 +0200 Subject: [PATCH 4/7] Adding whoami in services --- Core/DISET/RequestHandler.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Core/DISET/RequestHandler.py b/Core/DISET/RequestHandler.py index 73d9ecc0c2b..682531701dd 100755 --- a/Core/DISET/RequestHandler.py +++ b/Core/DISET/RequestHandler.py @@ -490,6 +490,17 @@ def export_ping(self): return S_OK(dInfo) + types_whoami = [] + auth_whoami = ['all'] + def export_whoami(self): + """ + A simple whoami, returns all credential dictionnary, except certificate chain object. + """ + credDict = self.srv_getRemoteCredentials() + if 'x509Chain' in credDict: + del credDict['x509Chain'] + return S_OK(credDict) + types_echo = [basestring] @staticmethod From 0a0e7775151c6fd6fc143edd21a8a91989ae0a35 Mon Sep 17 00:00:00 2001 From: louisjdmartin Date: Fri, 17 Aug 2018 11:43:09 +0200 Subject: [PATCH 5/7] pep8 --- ConfigurationSystem/Client/CSAPI.py | 2 +- ConfigurationSystem/private/ServiceInterfaceBase.py | 7 +++---- Core/DISET/RequestHandler.py | 13 +++++++------ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ConfigurationSystem/Client/CSAPI.py b/ConfigurationSystem/Client/CSAPI.py index 1902be16299..d7ff19ad20a 100644 --- a/ConfigurationSystem/Client/CSAPI.py +++ b/ConfigurationSystem/Client/CSAPI.py @@ -83,7 +83,7 @@ def initialize(self): if not retVal['OK']: self.__initialized = S_ERROR("Master server is not known. Is everything initialized?") return self.__initialized - self.__rpcClient = ConfigurationServerClient(url=gConfig.getValue( "/DIRAC/Configuration/MasterServer", "" )) + self.__rpcClient = ConfigurationServerClient(url=gConfig.getValue("/DIRAC/Configuration/MasterServer", "")) self.__csMod = Modificator(self.__rpcClient, "%s - %s - %s" % (self.__userGroup, self.__userDN, Time.dateTime().strftime("%Y-%m-%d %H:%M:%S"))) retVal = self.downloadCSData() diff --git a/ConfigurationSystem/private/ServiceInterfaceBase.py b/ConfigurationSystem/private/ServiceInterfaceBase.py index 0085a34df66..ea87d0bbd56 100644 --- a/ConfigurationSystem/private/ServiceInterfaceBase.py +++ b/ConfigurationSystem/private/ServiceInterfaceBase.py @@ -115,7 +115,6 @@ def _checkSlavesStatus(self, forceWriteConfiguration=False): ", ".join(self.dAliveSlaveServers.keys()))) self.__generateNewVersion() - def updateConfiguration(self, sBuffer, commiter="", updateVersionOption=False): """ Update the configuration @@ -158,7 +157,7 @@ def updateConfiguration(self, sBuffer, commiter="", updateVersionOption=False): gConfigurationData.unlock() gLogger.info("Generating new version") gConfigurationData.generateNewVersion() - #self._checkSlavesStatus( forceWriteConfiguration = True ) + # self._checkSlavesStatus( forceWriteConfiguration = True ) gLogger.info("Writing new version to disk!") retVal = gConfigurationData.writeRemoteConfigurationToDisk("%s@%s" % (commiter, gConfigurationData.getVersion())) gLogger.info("New version it is!") @@ -239,9 +238,9 @@ def _checkConflictsInModifications(self, realModList, reqModList, parentSection= """ realModifiedSections = dict([(modAc[1], modAc[3]) for modAc in realModList if modAc[0].find('Sec') == len(modAc[0]) - 3]) - #reqOptionsModificationList = dict([(modAc[1], modAc[3]) + # reqOptionsModificationList = dict([(modAc[1], modAc[3]) # for modAc in reqModList if modAc[0].find('Opt') == len(modAc[0]) - 3]) - #optionModRequests = 0 + # optionModRequests = 0 for modAc in reqModList: action = modAc[0] objectName = modAc[1] diff --git a/Core/DISET/RequestHandler.py b/Core/DISET/RequestHandler.py index 682531701dd..125d8370639 100755 --- a/Core/DISET/RequestHandler.py +++ b/Core/DISET/RequestHandler.py @@ -253,7 +253,7 @@ def __RPCCallFunction(self, method, args): try: # Get the method we are trying to call oMethod = getattr(self, realMethod) - except: + except BaseException: return S_ERROR("Unknown method %s" % method) # Check if the client sends correct arguments dRetVal = self.__checkExpectedArgumentTypes(method, args) @@ -290,7 +290,7 @@ def __checkExpectedArgumentTypes(self, method, args): sListName = "types_%s" % method try: oTypesList = getattr(self, sListName) - except: + except BaseException: gLogger.error("There's no types info for method", "export_%s" % method) return S_ERROR("Handler error for server %s while processing method %s" % (self.serviceInfoDict['serviceName'], method)) @@ -361,7 +361,7 @@ def _rh_executeConnectionCallback(self, methodName, args=False): gLogger.debug("Callback to %s" % realMethod) try: oMethod = getattr(self, realMethod) - except: + except BaseException: # No callback defined by handler return S_OK() try: @@ -383,7 +383,7 @@ def _rh_executeMessageCallback(self, msgObj): startTime = time.time() try: oMethod = getattr(self, methodName) - except: + except BaseException: return S_ERROR("Handler function for message %s does not exist!" % msgName) self.__lockManager.lock(methodName) try: @@ -466,7 +466,7 @@ def export_ping(self): with open("/proc/uptime") as oFD: iUptime = long(float(oFD.readline().split()[0].strip())) dInfo['host uptime'] = iUptime - except: + except BaseException: pass startTime = self.serviceInfoDict['serviceStartTime'] dInfo['service start time'] = self.serviceInfoDict['serviceStartTime'] @@ -477,7 +477,7 @@ def export_ping(self): with open("/proc/loadavg") as oFD: sLine = oFD.readline() dInfo['load'] = " ".join(sLine.split()[:3]) - except: + except BaseException: pass dInfo['name'] = self.serviceInfoDict['serviceName'] stTimes = os.times() @@ -492,6 +492,7 @@ def export_ping(self): types_whoami = [] auth_whoami = ['all'] + def export_whoami(self): """ A simple whoami, returns all credential dictionnary, except certificate chain object. From e5fe73f545facc6a43a8c1dde7ac69cc75c4ec53 Mon Sep 17 00:00:00 2001 From: louisjdmartin Date: Fri, 17 Aug 2018 13:50:25 +0200 Subject: [PATCH 6/7] Merging all ServiceInterface in a single file --- .../private/ServiceInterface.py | 292 +++++++++++++++++- .../private/ServiceInterfaceBase.py | 291 ----------------- Core/DISET/RequestHandler.py | 4 +- 3 files changed, 290 insertions(+), 297 deletions(-) delete mode 100644 ConfigurationSystem/private/ServiceInterfaceBase.py diff --git a/ConfigurationSystem/private/ServiceInterface.py b/ConfigurationSystem/private/ServiceInterface.py index 3ed1a132bad..d7fcfd450b4 100755 --- a/ConfigurationSystem/private/ServiceInterface.py +++ b/ConfigurationSystem/private/ServiceInterface.py @@ -1,14 +1,298 @@ -""" Threaded implementation of service interface +""" +Service interface is the service who provide config for client and synchronize Master/Slave servers """ +__RCSID__ = "$Id$" import time import threading +import os +import re +import zipfile +import zlib + +import DIRAC from DIRAC import gLogger +from DIRAC.Core.Utilities.File import mkDir +from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData, ConfigurationData +from DIRAC.ConfigurationSystem.private.Refresher import gRefresher +from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR +from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient -from DIRAC.ConfigurationSystem.private.ServiceInterfaceBase import ServiceInterfaceBase -from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData -__RCSID__ = "$Id$" +class ServiceInterfaceBase(object): + """Service interface is the service who provide config for client and synchronize Master/Slave servers""" + + def __init__(self, sURL): + self.sURL = sURL + gLogger.info("Initializing Configuration Service", "URL is %s" % sURL) + self.__modificationsIgnoreMask = ['/DIRAC/Configuration/Servers', '/DIRAC/Configuration/Version'] + gConfigurationData.setAsService() + if not gConfigurationData.isMaster(): + gLogger.info("Starting configuration service as slave") + gRefresher.autoRefreshAndPublish(self.sURL) + else: + gLogger.info("Starting configuration service as master") + gRefresher.disable() + self.__loadConfigurationData() + self.dAliveSlaveServers = {} + + def __loadConfigurationData(self): + """ + As the name said, it just load the configuration + """ + mkDir(os.path.join(DIRAC.rootPath, "etc", "csbackup")) + gConfigurationData.loadConfigurationData() + if gConfigurationData.isMaster(): + bBuiltNewConfiguration = False + if not gConfigurationData.getName(): + DIRAC.abort(10, "Missing name for the configuration to be exported!") + gConfigurationData.exportName() + sVersion = gConfigurationData.getVersion() + if sVersion == "0": + gLogger.info("There's no version. Generating a new one") + gConfigurationData.generateNewVersion() + bBuiltNewConfiguration = True + + if self.sURL not in gConfigurationData.getServers(): + gConfigurationData.setServers(self.sURL) + bBuiltNewConfiguration = True + + gConfigurationData.setMasterServer(self.sURL) + + if bBuiltNewConfiguration: + gConfigurationData.writeRemoteConfigurationToDisk() + + def __generateNewVersion(self): + """ + After changing configuration, we use this method to save them + """ + if gConfigurationData.isMaster(): + gConfigurationData.generateNewVersion() + gConfigurationData.writeRemoteConfigurationToDisk() + + def publishSlaveServer(self, sSlaveURL): + """ + Called by the slave server via service, it register a new slave server + + :param sSlaveURL: url of slave server + """ + if not gConfigurationData.isMaster(): + return S_ERROR("Configuration modification is not allowed in this server") + + gLogger.info("Pinging slave %s" % sSlaveURL) + + rpcClient = ConfigurationServerClient(url=sSlaveURL, timeout=10, useCertificates=True) + retVal = rpcClient.ping() + if not retVal['OK']: + gLogger.info("Slave %s didn't reply" % sSlaveURL) + return + if retVal['Value']['name'] != 'Configuration/Server': + gLogger.info("Slave %s is not a CS serveR" % sSlaveURL) + return + bNewSlave = False + if sSlaveURL not in self.dAliveSlaveServers: + bNewSlave = True + gLogger.info("New slave registered", sSlaveURL) + self.dAliveSlaveServers[sSlaveURL] = time.time() + if bNewSlave: + gConfigurationData.setServers("%s, %s" % (self.sURL, + ", ".join(self.dAliveSlaveServers.keys()))) + self.__generateNewVersion() + + def _checkSlavesStatus(self, forceWriteConfiguration=False): + """ + Check if Slaves server are still availlable + + :param forceWriteConfiguration=False: Force rewriting configuration after checking slaves + """ + gLogger.info("Checking status of slave servers") + iGraceTime = gConfigurationData.getSlavesGraceTime() + lSlaveURLs = self.dAliveSlaveServers.keys() + bModifiedSlaveServers = False + for sSlaveURL in lSlaveURLs: + if time.time() - self.dAliveSlaveServers[sSlaveURL] > iGraceTime: + gLogger.info("Found dead slave", sSlaveURL) + del self.dAliveSlaveServers[sSlaveURL] + bModifiedSlaveServers = True + if bModifiedSlaveServers or forceWriteConfiguration: + gConfigurationData.setServers("%s, %s" % (self.sURL, + ", ".join(self.dAliveSlaveServers.keys()))) + self.__generateNewVersion() + + def updateConfiguration(self, sBuffer, commiter="", updateVersionOption=False): + """ + Update the configuration + """ + if not gConfigurationData.isMaster(): + return S_ERROR("Configuration modification is not allowed in this server") + # Load the data in a ConfigurationData object + oRemoteConfData = ConfigurationData(False) + oRemoteConfData.loadRemoteCFGFromCompressedMem(sBuffer) + if updateVersionOption: + oRemoteConfData.setVersion(gConfigurationData.getVersion()) + # Test that remote and new versions are the same + sRemoteVersion = oRemoteConfData.getVersion() + sLocalVersion = gConfigurationData.getVersion() + gLogger.info("Checking versions\nremote: %s\nlocal: %s" % (sRemoteVersion, sLocalVersion)) + if sRemoteVersion != sLocalVersion: + if not gConfigurationData.mergingEnabled(): + return S_ERROR("Local and remote versions differ (%s vs %s). Cannot commit." % (sLocalVersion, sRemoteVersion)) + else: + gLogger.info("AutoMerging new data!") + if updateVersionOption: + return S_ERROR("Cannot AutoMerge! version was overwritten") + result = self.__mergeIndependentUpdates(oRemoteConfData) + if not result['OK']: + gLogger.warn("Could not AutoMerge!", result['Message']) + return S_ERROR("AutoMerge failed: %s" % result['Message']) + requestedRemoteCFG = result['Value'] + gLogger.info("AutoMerge successful!") + oRemoteConfData.setRemoteCFG(requestedRemoteCFG) + # Test that configuration names are the same + sRemoteName = oRemoteConfData.getName() + sLocalName = gConfigurationData.getName() + if sRemoteName != sLocalName: + return S_ERROR("Names differ: Server is %s and remote is %s" % (sLocalName, sRemoteName)) + # Update and generate a new version + gLogger.info("Committing new data...") + gConfigurationData.lock() + gLogger.info("Setting the new CFG") + gConfigurationData.setRemoteCFG(oRemoteConfData.getRemoteCFG()) + gConfigurationData.unlock() + gLogger.info("Generating new version") + gConfigurationData.generateNewVersion() + # self._checkSlavesStatus( forceWriteConfiguration = True ) + gLogger.info("Writing new version to disk!") + retVal = gConfigurationData.writeRemoteConfigurationToDisk("%s@%s" % (commiter, gConfigurationData.getVersion())) + gLogger.info("New version it is!") + return retVal + + def getCompressedConfigurationData(self): + """ + Get the configuration + """ + return gConfigurationData.getCompressedData() + + def getVersion(self): + """ + Get the version + """ + return gConfigurationData.getVersion() + + def getCommitHistory(self): + """ + Get the history + """ + files = self.__getCfgBackups(gConfigurationData.getBackupDir()) + backups = [".".join(fileName.split(".")[1:-1]).split("@") for fileName in files] + return backups + + def getVersionContents(self, date): + """ + Get an old version + """ + backupDir = gConfigurationData.getBackupDir() + files = self.__getCfgBackups(backupDir, date) + for fileName in files: + with zipfile.ZipFile("%s/%s" % (backupDir, fileName), "r") as zFile: + cfgName = zFile.namelist()[0] + retVal = S_OK(zlib.compress(zFile.read(cfgName), 9)) + return retVal + return S_ERROR("Version %s does not exist" % date) + + def __getCfgBackups(self, basePath, date="", subPath=""): + """ + Get list of backup file + """ + rs = re.compile(r"^%s\..*%s.*\.zip$" % (gConfigurationData.getName(), date)) + fsEntries = os.listdir("%s/%s" % (basePath, subPath)) + fsEntries.sort(reverse=True) + backupsList = [] + for entry in fsEntries: + entryPath = "%s/%s/%s" % (basePath, subPath, entry) + if os.path.isdir(entryPath): + backupsList.extend(self.__getCfgBackups(basePath, date, "%s/%s" % (subPath, entry))) + elif os.path.isfile(entryPath): + if rs.search(entry): + backupsList.append("%s/%s" % (subPath, entry)) + return backupsList + + def __getPreviousCFG(self, oRemoteConfData): + """ + Get last configurtion + """ + # remoteExpectedVersion = oRemoteConfData.getVersion() + backupsList = self.__getCfgBackups(gConfigurationData.getBackupDir(), date=oRemoteConfData.getVersion()) + if not backupsList: + return S_ERROR("Could not AutoMerge. Could not retrieve original commiter's version") + prevRemoteConfData = ConfigurationData() + backFile = backupsList[0] + if backFile[0] == "/": + backFile = os.path.join(gConfigurationData.getBackupDir(), backFile[1:]) + try: + prevRemoteConfData.loadConfigurationData(backFile) + except Exception as e: + return S_ERROR("Could not load original commiter's version: %s" % str(e)) + gLogger.info("Loaded client original version %s" % prevRemoteConfData.getVersion()) + return S_OK(prevRemoteConfData.getRemoteCFG()) + + def _checkConflictsInModifications(self, realModList, reqModList, parentSection=""): + """ + Check for conflicts + """ + realModifiedSections = dict([(modAc[1], modAc[3]) + for modAc in realModList if modAc[0].find('Sec') == len(modAc[0]) - 3]) + # reqOptionsModificationList = dict([(modAc[1], modAc[3]) + # for modAc in reqModList if modAc[0].find('Opt') == len(modAc[0]) - 3]) + # optionModRequests = 0 + for modAc in reqModList: + action = modAc[0] + objectName = modAc[1] + if action == "addSec": + if objectName in realModifiedSections: + return S_ERROR("Section %s/%s already exists" % (parentSection, objectName)) + elif action == "delSec": + if objectName in realModifiedSections: + return S_ERROR("Section %s/%s cannot be deleted. It has been modified." % (parentSection, objectName)) + elif action == "modSec": + if objectName in realModifiedSections: + result = self._checkConflictsInModifications(realModifiedSections[objectName], + modAc[3], "%s/%s" % (parentSection, objectName)) + if not result['OK']: + return result + for modAc in realModList: + action = modAc[0] + objectName = modAc[1] + if action.find("Opt") == len(action) - 3: + return S_ERROR( + "Section %s cannot be merged. Option %s/%s has been modified" % + (parentSection, parentSection, objectName)) + return S_OK() + + def __mergeIndependentUpdates(self, oRemoteConfData): + """ + Merge configuration + """ + # Get all the CFGs + curSrvCFG = gConfigurationData.getRemoteCFG().clone() + curCliCFG = oRemoteConfData.getRemoteCFG().clone() + result = self.__getPreviousCFG(oRemoteConfData) + if not result['OK']: + return result + prevCliCFG = result['Value'] + # Try to merge curCli with curSrv. To do so we check the updates from + # prevCli -> curSrv VS prevCli -> curCli + prevCliToCurCliModList = prevCliCFG.getModifications(curCliCFG) + prevCliToCurSrvModList = prevCliCFG.getModifications(curSrvCFG) + result = self._checkConflictsInModifications(prevCliToCurSrvModList, + prevCliToCurCliModList) + if not result['OK']: + return S_ERROR("Cannot AutoMerge: %s" % result['Message']) + # Merge! + result = curSrvCFG.applyModifications(prevCliToCurCliModList) + if not result['OK']: + return result + return S_OK(curSrvCFG) class ServiceInterface(ServiceInterfaceBase, threading.Thread): diff --git a/ConfigurationSystem/private/ServiceInterfaceBase.py b/ConfigurationSystem/private/ServiceInterfaceBase.py deleted file mode 100644 index ea87d0bbd56..00000000000 --- a/ConfigurationSystem/private/ServiceInterfaceBase.py +++ /dev/null @@ -1,291 +0,0 @@ -"""Service interface is the service who provide config for client and synchronize Master/Slave servers""" - -import os -import time -import re -import zipfile -import zlib - -import DIRAC -from DIRAC.Core.Utilities.File import mkDir -from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData, ConfigurationData -from DIRAC.ConfigurationSystem.private.Refresher import gRefresher -from DIRAC.FrameworkSystem.Client.Logger import gLogger -from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR -from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient - - -class ServiceInterfaceBase(object): - """Service interface is the service who provide config for client and synchronize Master/Slave servers""" - - def __init__(self, sURL): - self.sURL = sURL - gLogger.info("Initializing Configuration Service", "URL is %s" % sURL) - self.__modificationsIgnoreMask = ['/DIRAC/Configuration/Servers', '/DIRAC/Configuration/Version'] - gConfigurationData.setAsService() - if not gConfigurationData.isMaster(): - gLogger.info("Starting configuration service as slave") - gRefresher.autoRefreshAndPublish(self.sURL) - else: - gLogger.info("Starting configuration service as master") - gRefresher.disable() - self.__loadConfigurationData() - self.dAliveSlaveServers = {} - - def __loadConfigurationData(self): - """ - As the name said, it just load the configuration - """ - mkDir(os.path.join(DIRAC.rootPath, "etc", "csbackup")) - gConfigurationData.loadConfigurationData() - if gConfigurationData.isMaster(): - bBuiltNewConfiguration = False - if not gConfigurationData.getName(): - DIRAC.abort(10, "Missing name for the configuration to be exported!") - gConfigurationData.exportName() - sVersion = gConfigurationData.getVersion() - if sVersion == "0": - gLogger.info("There's no version. Generating a new one") - gConfigurationData.generateNewVersion() - bBuiltNewConfiguration = True - - if self.sURL not in gConfigurationData.getServers(): - gConfigurationData.setServers(self.sURL) - bBuiltNewConfiguration = True - - gConfigurationData.setMasterServer(self.sURL) - - if bBuiltNewConfiguration: - gConfigurationData.writeRemoteConfigurationToDisk() - - def __generateNewVersion(self): - """ - After changing configuration, we use this method to save them - """ - if gConfigurationData.isMaster(): - gConfigurationData.generateNewVersion() - gConfigurationData.writeRemoteConfigurationToDisk() - - def publishSlaveServer(self, sSlaveURL): - """ - Called by the slave server via service, it register a new slave server - - :param sSlaveURL: url of slave server - """ - if not gConfigurationData.isMaster(): - return S_ERROR("Configuration modification is not allowed in this server") - - gLogger.info("Pinging slave %s" % sSlaveURL) - - rpcClient = ConfigurationServerClient(url=sSlaveURL, timeout=10, useCertificates=True) - retVal = rpcClient.ping() - if not retVal['OK']: - gLogger.info("Slave %s didn't reply" % sSlaveURL) - return - if retVal['Value']['name'] != 'Configuration/Server': - gLogger.info("Slave %s is not a CS serveR" % sSlaveURL) - return - bNewSlave = False - if sSlaveURL not in self.dAliveSlaveServers: - bNewSlave = True - gLogger.info("New slave registered", sSlaveURL) - self.dAliveSlaveServers[sSlaveURL] = time.time() - if bNewSlave: - gConfigurationData.setServers("%s, %s" % (self.sURL, - ", ".join(self.dAliveSlaveServers.keys()))) - self.__generateNewVersion() - - def _checkSlavesStatus(self, forceWriteConfiguration=False): - """ - Check if Slaves server are still availlable - - :param forceWriteConfiguration=False: Force rewriting configuration after checking slaves - """ - gLogger.info("Checking status of slave servers") - iGraceTime = gConfigurationData.getSlavesGraceTime() - lSlaveURLs = self.dAliveSlaveServers.keys() - bModifiedSlaveServers = False - for sSlaveURL in lSlaveURLs: - if time.time() - self.dAliveSlaveServers[sSlaveURL] > iGraceTime: - gLogger.info("Found dead slave", sSlaveURL) - del self.dAliveSlaveServers[sSlaveURL] - bModifiedSlaveServers = True - if bModifiedSlaveServers or forceWriteConfiguration: - gConfigurationData.setServers("%s, %s" % (self.sURL, - ", ".join(self.dAliveSlaveServers.keys()))) - self.__generateNewVersion() - - def updateConfiguration(self, sBuffer, commiter="", updateVersionOption=False): - """ - Update the configuration - """ - if not gConfigurationData.isMaster(): - return S_ERROR("Configuration modification is not allowed in this server") - # Load the data in a ConfigurationData object - oRemoteConfData = ConfigurationData(False) - oRemoteConfData.loadRemoteCFGFromCompressedMem(sBuffer) - if updateVersionOption: - oRemoteConfData.setVersion(gConfigurationData.getVersion()) - # Test that remote and new versions are the same - sRemoteVersion = oRemoteConfData.getVersion() - sLocalVersion = gConfigurationData.getVersion() - gLogger.info("Checking versions\nremote: %s\nlocal: %s" % (sRemoteVersion, sLocalVersion)) - if sRemoteVersion != sLocalVersion: - if not gConfigurationData.mergingEnabled(): - return S_ERROR("Local and remote versions differ (%s vs %s). Cannot commit." % (sLocalVersion, sRemoteVersion)) - else: - gLogger.info("AutoMerging new data!") - if updateVersionOption: - return S_ERROR("Cannot AutoMerge! version was overwritten") - result = self.__mergeIndependentUpdates(oRemoteConfData) - if not result['OK']: - gLogger.warn("Could not AutoMerge!", result['Message']) - return S_ERROR("AutoMerge failed: %s" % result['Message']) - requestedRemoteCFG = result['Value'] - gLogger.info("AutoMerge successful!") - oRemoteConfData.setRemoteCFG(requestedRemoteCFG) - # Test that configuration names are the same - sRemoteName = oRemoteConfData.getName() - sLocalName = gConfigurationData.getName() - if sRemoteName != sLocalName: - return S_ERROR("Names differ: Server is %s and remote is %s" % (sLocalName, sRemoteName)) - # Update and generate a new version - gLogger.info("Committing new data...") - gConfigurationData.lock() - gLogger.info("Setting the new CFG") - gConfigurationData.setRemoteCFG(oRemoteConfData.getRemoteCFG()) - gConfigurationData.unlock() - gLogger.info("Generating new version") - gConfigurationData.generateNewVersion() - # self._checkSlavesStatus( forceWriteConfiguration = True ) - gLogger.info("Writing new version to disk!") - retVal = gConfigurationData.writeRemoteConfigurationToDisk("%s@%s" % (commiter, gConfigurationData.getVersion())) - gLogger.info("New version it is!") - return retVal - - def getCompressedConfigurationData(self): - """ - Get the configuration - """ - return gConfigurationData.getCompressedData() - - def getVersion(self): - """ - Get the version - """ - return gConfigurationData.getVersion() - - def getCommitHistory(self): - """ - Get the history - """ - files = self.__getCfgBackups(gConfigurationData.getBackupDir()) - backups = [".".join(fileName.split(".")[1:-1]).split("@") for fileName in files] - return backups - - def getVersionContents(self, date): - """ - Get an old version - """ - backupDir = gConfigurationData.getBackupDir() - files = self.__getCfgBackups(backupDir, date) - for fileName in files: - with zipfile.ZipFile("%s/%s" % (backupDir, fileName), "r") as zFile: - cfgName = zFile.namelist()[0] - retVal = S_OK(zlib.compress(zFile.read(cfgName), 9)) - return retVal - return S_ERROR("Version %s does not exist" % date) - - def __getCfgBackups(self, basePath, date="", subPath=""): - """ - Get list of backup file - """ - rs = re.compile(r"^%s\..*%s.*\.zip$" % (gConfigurationData.getName(), date)) - fsEntries = os.listdir("%s/%s" % (basePath, subPath)) - fsEntries.sort(reverse=True) - backupsList = [] - for entry in fsEntries: - entryPath = "%s/%s/%s" % (basePath, subPath, entry) - if os.path.isdir(entryPath): - backupsList.extend(self.__getCfgBackups(basePath, date, "%s/%s" % (subPath, entry))) - elif os.path.isfile(entryPath): - if rs.search(entry): - backupsList.append("%s/%s" % (subPath, entry)) - return backupsList - - def __getPreviousCFG(self, oRemoteConfData): - """ - Get last configurtion - """ - # remoteExpectedVersion = oRemoteConfData.getVersion() - backupsList = self.__getCfgBackups(gConfigurationData.getBackupDir(), date=oRemoteConfData.getVersion()) - if not backupsList: - return S_ERROR("Could not AutoMerge. Could not retrieve original commiter's version") - prevRemoteConfData = ConfigurationData() - backFile = backupsList[0] - if backFile[0] == "/": - backFile = os.path.join(gConfigurationData.getBackupDir(), backFile[1:]) - try: - prevRemoteConfData.loadConfigurationData(backFile) - except Exception as e: - return S_ERROR("Could not load original commiter's version: %s" % str(e)) - gLogger.info("Loaded client original version %s" % prevRemoteConfData.getVersion()) - return S_OK(prevRemoteConfData.getRemoteCFG()) - - def _checkConflictsInModifications(self, realModList, reqModList, parentSection=""): - """ - Check for conflicts - """ - realModifiedSections = dict([(modAc[1], modAc[3]) - for modAc in realModList if modAc[0].find('Sec') == len(modAc[0]) - 3]) - # reqOptionsModificationList = dict([(modAc[1], modAc[3]) - # for modAc in reqModList if modAc[0].find('Opt') == len(modAc[0]) - 3]) - # optionModRequests = 0 - for modAc in reqModList: - action = modAc[0] - objectName = modAc[1] - if action == "addSec": - if objectName in realModifiedSections: - return S_ERROR("Section %s/%s already exists" % (parentSection, objectName)) - elif action == "delSec": - if objectName in realModifiedSections: - return S_ERROR("Section %s/%s cannot be deleted. It has been modified." % (parentSection, objectName)) - elif action == "modSec": - if objectName in realModifiedSections: - result = self._checkConflictsInModifications(realModifiedSections[objectName], - modAc[3], "%s/%s" % (parentSection, objectName)) - if not result['OK']: - return result - for modAc in realModList: - action = modAc[0] - objectName = modAc[1] - if action.find("Opt") == len(action) - 3: - return S_ERROR( - "Section %s cannot be merged. Option %s/%s has been modified" % - (parentSection, parentSection, objectName)) - return S_OK() - - def __mergeIndependentUpdates(self, oRemoteConfData): - """ - Merge configuration - """ - # Get all the CFGs - curSrvCFG = gConfigurationData.getRemoteCFG().clone() - curCliCFG = oRemoteConfData.getRemoteCFG().clone() - result = self.__getPreviousCFG(oRemoteConfData) - if not result['OK']: - return result - prevCliCFG = result['Value'] - # Try to merge curCli with curSrv. To do so we check the updates from - # prevCli -> curSrv VS prevCli -> curCli - prevCliToCurCliModList = prevCliCFG.getModifications(curCliCFG) - prevCliToCurSrvModList = prevCliCFG.getModifications(curSrvCFG) - result = self._checkConflictsInModifications(prevCliToCurSrvModList, - prevCliToCurCliModList) - if not result['OK']: - return S_ERROR("Cannot AutoMerge: %s" % result['Message']) - # Merge! - result = curSrvCFG.applyModifications(prevCliToCurCliModList) - if not result['OK']: - return result - return S_OK(curSrvCFG) diff --git a/Core/DISET/RequestHandler.py b/Core/DISET/RequestHandler.py index 125d8370639..3a0a21e0fdc 100755 --- a/Core/DISET/RequestHandler.py +++ b/Core/DISET/RequestHandler.py @@ -498,8 +498,8 @@ def export_whoami(self): A simple whoami, returns all credential dictionnary, except certificate chain object. """ credDict = self.srv_getRemoteCredentials() - if 'x509Chain' in credDict: - del credDict['x509Chain'] + # Remove the certificate chain (client already have it) + credDict.pop('x509Chain', None) return S_OK(credDict) types_echo = [basestring] From c69f0d4fd20a28e1a49c9e1ceeaa91f93571e0ed Mon Sep 17 00:00:00 2001 From: louisjdmartin Date: Mon, 20 Aug 2018 11:28:51 +0200 Subject: [PATCH 7/7] Use 'Configuration/Server' as default URL in ConfigurationServerClient --- ConfigurationSystem/Client/ConfigurationServerClient.py | 5 ++++- ConfigurationSystem/private/ServiceInterface.py | 6 ++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/ConfigurationSystem/Client/ConfigurationServerClient.py b/ConfigurationSystem/Client/ConfigurationServerClient.py index cd834275b6b..aeb3efd245c 100644 --- a/ConfigurationSystem/Client/ConfigurationServerClient.py +++ b/ConfigurationSystem/Client/ConfigurationServerClient.py @@ -10,4 +10,7 @@ class ConfigurationServerClient(Client): Placeholder client to speak with ConfigurationServer. """ - pass + def __init__(self, **kwargs): + if 'url' not in kwargs: + kwargs['url'] = 'Configuration/Server' + super(ConfigurationServerClient, self).__init__(**kwargs) diff --git a/ConfigurationSystem/private/ServiceInterface.py b/ConfigurationSystem/private/ServiceInterface.py index d7fcfd450b4..40b9c624ca2 100755 --- a/ConfigurationSystem/private/ServiceInterface.py +++ b/ConfigurationSystem/private/ServiceInterface.py @@ -36,6 +36,12 @@ def __init__(self, sURL): self.__loadConfigurationData() self.dAliveSlaveServers = {} + def isMaster(self): + """ + Just returns if master server or not + """ + return gConfigurationData.isMaster() + def __loadConfigurationData(self): """ As the name said, it just load the configuration