Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ConfigurationSystem/Client/CSAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""

from DIRAC import gLogger, gConfig, S_OK, S_ERROR
from DIRAC.Core.DISET.RPCClient import RPCClient
from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient
from DIRAC.Core.Utilities import List, Time
from DIRAC.Core.Security.X509Chain import X509Chain
from DIRAC.Core.Security import Locations
Expand Down Expand Up @@ -83,7 +83,7 @@ def initialize(self):
if not retVal['OK']:
self.__initialized = S_ERROR("Master server is not known. Is everything initialized?")
return self.__initialized
self.__rpcClient = RPCClient(gConfig.getValue("/DIRAC/Configuration/MasterServer", ""))
self.__rpcClient = ConfigurationServerClient(url=gConfig.getValue("/DIRAC/Configuration/MasterServer", ""))
self.__csMod = Modificator(self.__rpcClient, "%s - %s - %s" %
(self.__userGroup, self.__userDN, Time.dateTime().strftime("%Y-%m-%d %H:%M:%S")))
retVal = self.downloadCSData()
Expand Down
16 changes: 16 additions & 0 deletions ConfigurationSystem/Client/ConfigurationServerClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
Custom Client for Configuration System
"""

from DIRAC.Core.Base.Client import Client


class ConfigurationServerClient(Client):
"""
Placeholder client to speak with ConfigurationServer.
"""

def __init__(self, **kwargs):
if 'url' not in kwargs:
kwargs['url'] = 'Configuration/Server'
super(ConfigurationServerClient, self).__init__(**kwargs)
209 changes: 133 additions & 76 deletions ConfigurationSystem/private/Refresher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
""" Refresh local CS (if needed)
In every gConfig
"""

__RCSID__ = "$Id$"
Expand Down Expand Up @@ -31,111 +30,93 @@ def _updateFromRemoteLocation(serviceClient):
return retVal


class Refresher(threading.Thread):
class RefresherBase():
"""
Code factorisation for the refresher
"""

def __init__(self):
threading.Thread.__init__(self)
self.__automaticUpdate = False
self.__lastUpdateTime = 0
self.__url = False
self.__refreshEnabled = True
self.__timeout = 60
self.__callbacks = {'newVersion': []}
gEventDispatcher.registerEvent("CSNewVersion")
self._automaticUpdate = False
self._lastUpdateTime = 0
self._url = False
self._refreshEnabled = True
self._timeout = 60
self._callbacks = {'newVersion': []}
random.seed()
self.__triggeredRefreshLock = LockRing.LockRing().getLock()
gEventDispatcher.registerEvent("CSNewVersion")

def disable(self):
self.__refreshEnabled = False
"""
Disable the refresher and prevent any request to another server
"""
self._refreshEnabled = False

def enable(self):
self.__refreshEnabled = True
if self.__lastRefreshExpired():
"""
Enable the refresher and authorize request to another server
WARNING: It will not activate automatic updates, use autoRefreshAndPublish() for that
"""
self._refreshEnabled = True
if self._lastRefreshExpired():
return self.forceRefresh()
return S_OK()

def isEnabled(self):
return self.__refreshEnabled
"""
Returns if you can use refresher or not, use automaticUpdateEnabled() to know
if refresh is automatic.
"""
return self._refreshEnabled

def addListenerToNewVersionEvent(self, functor):
gEventDispatcher.addListener("CSNewVersion", functor)

def __refreshInThread(self):
retVal = self.__refresh()
if not retVal['OK']:
gLogger.error("Error while updating the configuration", retVal['Message'])

def __lastRefreshExpired(self):
return time.time() - self.__lastUpdateTime >= gConfigurationData.getRefreshTime()

def refreshConfigurationIfNeeded(self):
if not self.__refreshEnabled or self.__automaticUpdate or not gConfigurationData.getServers():
return
self.__triggeredRefreshLock.acquire()
try:
if not self.__lastRefreshExpired():
return
self.__lastUpdateTime = time.time()
finally:
try:
self.__triggeredRefreshLock.release()
except thread.error:
pass
# Launch the refresh
thd = threading.Thread(target=self.__refreshInThread)
thd.setDaemon(1)
thd.start()
def _lastRefreshExpired(self):
"""
Just returns if last refresh must be considered as expired or not
"""
return time.time() - self._lastUpdateTime >= gConfigurationData.getRefreshTime()

def forceRefresh(self, fromMaster=False):
if self.__refreshEnabled:
return self.__refresh(fromMaster=fromMaster)
"""
Force refresh
WARNING: If refresher is disabled, force a refresh will do nothing
"""
if self._refreshEnabled:
return self._refresh(fromMaster=fromMaster)
return S_OK()

def autoRefreshAndPublish(self, sURL):
gLogger.debug("Setting configuration refresh as automatic")
if not gConfigurationData.getAutoPublish():
gLogger.debug("Slave server won't auto publish itself")
if not gConfigurationData.getName():
import DIRAC
DIRAC.abort(10, "Missing configuration name!")
self.__url = sURL
self.__automaticUpdate = True
self.setDaemon(1)
self.start()

def run(self):
while self.__automaticUpdate:
iWaitTime = gConfigurationData.getPropagationTime()
time.sleep(iWaitTime)
if self.__refreshEnabled:
if not self.__refreshAndPublish():
gLogger.error("Can't refresh configuration from any source")

def __refreshAndPublish(self):
self.__lastUpdateTime = time.time()
def _refreshAndPublish(self):
"""
Refresh configuration and publish local updates
"""
self._lastUpdateTime = time.time()
gLogger.info("Refreshing from master server")
from DIRAC.Core.DISET.RPCClient import RPCClient
sMasterServer = gConfigurationData.getMasterServer()
if sMasterServer:
oClient = RPCClient(sMasterServer, timeout=self.__timeout,
useCertificates=gConfigurationData.useServerCertificate(),
skipCACheck=gConfigurationData.skipCACheck())
from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient
oClient = ConfigurationServerClient(url=sMasterServer, timeout=self._timeout,
useCertificates=gConfigurationData.useServerCertificate(),
skipCACheck=gConfigurationData.skipCACheck())
dRetVal = _updateFromRemoteLocation(oClient)
if not dRetVal['OK']:
gLogger.error("Can't update from master server", dRetVal['Message'])
return False
if gConfigurationData.getAutoPublish():
gLogger.info("Publishing to master server...")
dRetVal = oClient.publishSlaveServer(self.__url)
dRetVal = oClient.publishSlaveServer(self._url)
if not dRetVal['OK']:
gLogger.error("Can't publish to master server", dRetVal['Message'])
return True
else:
gLogger.warn("No master server is specified in the configuration, trying to get data from other slaves")
return self.__refresh()['OK']
return self._refresh()['OK']

def __refresh(self, fromMaster=False):
self.__lastUpdateTime = time.time()
def _refresh(self, fromMaster=False):
"""
Refresh configuration
"""
self._lastUpdateTime = time.time()
gLogger.debug("Refreshing configuration...")
gatewayList = getGatewayURLs("Configuration/Server")
updatingErrorsList = []
Expand All @@ -158,10 +139,10 @@ def __refresh(self, fromMaster=False):
gLogger.debug("Randomized server list is %s" % ", ".join(randomServerList))

for sServer in randomServerList:
from DIRAC.Core.DISET.RPCClient import RPCClient
oClient = RPCClient(sServer,
useCertificates=gConfigurationData.useServerCertificate(),
skipCACheck=gConfigurationData.skipCACheck())
from DIRAC.ConfigurationSystem.Client.ConfigurationServerClient import ConfigurationServerClient
oClient = ConfigurationServerClient(url=sServer,
useCertificates=gConfigurationData.useServerCertificate(),
skipCACheck=gConfigurationData.skipCACheck())
dRetVal = _updateFromRemoteLocation(oClient)
if dRetVal['OK']:
return dRetVal
Expand All @@ -172,13 +153,89 @@ def __refresh(self, fromMaster=False):
break
return S_ERROR("Reason(s):\n\t%s" % "\n\t".join(List.uniqueElements(updatingErrorsList)))


class Refresher(RefresherBase, threading.Thread):
"""
The refresher
A long time ago, in a code away, far away...
A guy do the code to autorefresh the configuration
To prepare transition to HTTPS we have done separation
between the logic and the implementation of background
tasks, it's the original version, for diset, using thread.

"""

def __init__(self):
threading.Thread.__init__(self)
RefresherBase.__init__(self)
self._triggeredRefreshLock = LockRing.LockRing().getLock()

def _refreshInThread(self):
"""
Refreshing configration in the background. By default it use a thread but it can be
also runned in the IOLoop
"""
retVal = self._refresh()
if not retVal['OK']:
gLogger.error("Error while updating the configuration", retVal['Message'])

def refreshConfigurationIfNeeded(self):
"""
Refresh the configuration if automatic update are disabled, refresher is enabled and servers are defined
"""
if not self._refreshEnabled or self._automaticUpdate or not gConfigurationData.getServers():
return
self._triggeredRefreshLock.acquire()
try:
if not self._lastRefreshExpired():
return
self._lastUpdateTime = time.time()
finally:
try:
self._triggeredRefreshLock.release()
except thread.error:
pass
# Launch the refreshf
thd = threading.Thread(target=self._refreshInThread)
thd.setDaemon(1)
thd.start()

def autoRefreshAndPublish(self, sURL):
"""
Start the autorefresh background task

:param str sURL: URL of the configuration server
"""
gLogger.debug("Setting configuration refresh as automatic")
if not gConfigurationData.getAutoPublish():
gLogger.debug("Slave server won't auto publish itself")
if not gConfigurationData.getName():
import DIRAC
DIRAC.abort(10, "Missing configuration name!")
self._url = sURL
self._automaticUpdate = True
self.setDaemon(1)
self.start()

def run(self):
while self._automaticUpdate:
time.sleep(gConfigurationData.getPropagationTime())
if self._refreshEnabled:
if not self._refreshAndPublish():
gLogger.error("Can't refresh configuration from any source")

def daemonize(self):
"""
Daemonize the background tasks
"""

self.setDaemon(1)
self.start()


gRefresher = Refresher()


if __name__ == "__main__":
time.sleep(0.1)
gRefresher.daemonize()
Loading