From c6c8c19d1497c97a209276192fe1753b9f8d321f Mon Sep 17 00:00:00 2001 From: martynia Date: Wed, 18 Aug 2021 17:51:15 +0200 Subject: [PATCH 1/7] feat: TornadoPilotLoggingHandler for remote logging. PilotLoggingAgent to upload logs to SE. --- .../Agent/PilotLoggingAgent.py | 102 ++++++++++++++++++ .../ConfigTemplate.cfg | 8 ++ .../Service/TornadoPilotLoggingHandler.py | 89 +++++++++++++++ .../Utilities/PilotCStoJSONSynchronizer.py | 23 ++++ 4 files changed, 222 insertions(+) create mode 100644 src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py create mode 100644 src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py new file mode 100644 index 00000000000..4babc44d462 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py @@ -0,0 +1,102 @@ +""" :mod: PilotLoggingAgent + + PilotLoggingAgent sends Pilot log files to an SE +""" + +# # imports +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os + +from DIRAC import S_OK, S_ERROR +from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals +from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.DISET.RPCClient import RPCClient +from DIRAC.Core.Security.Locations import getHostCertificateAndKeyLocation, getCAsLocation +from DIRAC.DataManagementSystem.Client.DataManager import DataManager +from DIRAC.WorkloadManagementSystem.Service.TornadoPilotLoggingHandler import TornadoPilotLoggingHandler +import requests + +__RCSID__ = "Id$" + +class PilotLoggingAgent(AgentModule): + """ + .. class:: PilotLoggingAgent + + The agent sends completed pilot log files to permanent storage for analysis. + """ + + def initialize(self): + """ + agent's initalisation + + :param self: self reference + """ + # Determine the VO for shifter + # The PilotLoggingAgent might use a VO-specific shifter proxy. + self.vo = self.am_getOption("VO", '') + if not self.vo: + self.vo = self.am_getOption("Community", '') + if not self.vo: + self.vo = CSGlobals.getVO() + + # The SiteDirector is for a particular user group + #self.group = self.am_getOption("Group", '') + + # get shifter proxy for uploads + self.am_setOption('shifterProxy', 'DataManager') + self.uploadSE = self.am_getOption('UploadSE', 'UKI-LT2-IC-HEP-disk') + + self.message = self.am_getOption('Message', "PilotLoggingAgent initialised.") + self.log.info("message = %s" % self.message) + self.certAndKeyLocation = getHostCertificateAndKeyLocation() + self.casLocation = getCAsLocation() + + data = {'method': 'getMetadata'} + self.server = self.am_getOption("DownloadLocation", None) + + if not self.server: + return S_ERROR("No DownloadLocation set in the CS !") + try: + with requests.post(self.server, data=data, verify=self.casLocation, cert=self.certAndKeyLocation) as res: + if res.status_code not in (200, 202): + message = "Could not get metadata from %s: status %s" % (self.server, res.status_code) + self.log.error(message) + return S_ERROR(message) + resDict = res.json() + except Exception as exc: + message = "Call to server %s failed with %s " % (self.server, exc) + self.log.error(message) + return S_ERROR(message) + if resDict['OK']: + meta = resDict['Value'] + self.pilotLogPath = meta['LogPath'] + else: + return S_ERROR(resDict['Message']) + self.log.info("Pilot log files location = %s " % self.pilotLogPath) + return S_OK() + + def execute(self): + """ execution in one agent's cycle + + :param self: self reference + """ + + self.log.info("Pilot files upload cycle started.") + files = [f for f in os.listdir(self.pilotLogPath) + if os.path.isfile(os.path.join(self.pilotLogPath, f)) and f.endswith('log')] + for elem in files: + lfn = os.path.join('/gridpp/pilotlogs/', elem) + name = os.path.join(self.pilotLogPath, elem) + res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=self.uploadSE, overwrite=True) + if not res['OK']: + self.log.error("Could not upload", "to %s: %s" % (self.uploadSE, res['Message'])) + else: + self.log.info("File uploaded: ", "LFN = %s" % res['Value']) + try: + os.remove(name) + except Exception as excp: + self.log.exception("Cannot remove a local file after uploading", lException=excp) + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index eedaf4a23f8..15d023e9114 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -19,6 +19,14 @@ Services } } ##END + TornadoPilotLogging + { + Protocol = https + Authorization + { + Default = authenticated + } + } JobMonitoring { Port = 9130 diff --git a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py new file mode 100644 index 00000000000..6db0399d3b6 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py @@ -0,0 +1,89 @@ +""" Tornado-based HTTPs JobMonitoring service. +""" + + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +__RCSID__ = "$Id$" + +import os +import json +from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC.Core.Tornado.Server.TornadoService import TornadoService + +sLog = gLogger.getSubLogger(__name__) + + +class TornadoPilotLoggingHandler(TornadoService): + log = sLog + + @classmethod + def initializeHandler(cls, infosDict): + """ + Called once, at the first request. Create a directory where pilot logs will be stored. + + :param infosDict: + :return: None + """ + + TornadoPilotLoggingHandler.log.info("Handler initialised ...") + cls.meta={} + logPath = os.path.join(os.getcwd(), "pilotlogs") + cls.meta['LogPath'] = logPath + if not os.path.exists(logPath): + os.makedirs(logPath) + TornadoPilotLoggingHandler.log.info("Pilot logging directory:", logPath) + + def initializeRequest(self): + """ + Called for each request. + + :return: None + """ + + self.log.info("Request initialised.. ") + + auth_sayHello = ['all'] + def export_sayHello(self): + ## Insert your method here, don't forget the return should be serializable + ## Returned value may be an S_OK/S_ERROR + ## You don't need to serialize in JSON, Tornado will do it + self.log.info('Hello...') + return S_OK('Hello!') + + auth_sendMessage = ['all'] + def export_sendMessage(self, message): + #def export_sendMessage(self, message, pilotUUID): + """ + The method logs messages to Tornado and writes pilot log files, one per pilot. + + :param message: message sent by a client, in JSON format + :param pilotUUID: pilot UUID - used to create a log file + :return: S_OK or S_ERROR if a file cannot be created or written to. + """ + ## Insert your method here, don't forget the return should be serializable + ## Returned value may be an S_OK/S_ERROR + ## You don't need to serialize in JSON, Tornado will do it + self.log.info('Message: ', message) + messageDict = json.loads(message) + pilotUUID = messageDict.get('pilotUUID', 'Unspecified_ID') + with open(os.path.join(TornadoPilotLoggingHandler.meta['LogPath'], pilotUUID), 'a') as pilotLog: + try: + pilotLog.write(message+'\n') + except IOError as ioerr: + self.log.error("Error writing to log file:", str(ioerr)) + return S_ERROR(str(ioerr)) + return S_OK('Message logged successfully for pilot: %s' % (pilotUUID,)) + + auth_getMetadata = ['all'] + def export_getMetadata(self): + """ + Get PilotLoggingHandler metadata. + + :return: S_OK containing a metadata dictionary + """ + if 'LogPath' in TornadoPilotLoggingHandler.meta: + return S_OK(TornadoPilotLoggingHandler.meta) + return S_ERROR("No Pilot logging directory defined") diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py index efa74fd8fd3..4a3e28db75d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py @@ -244,6 +244,29 @@ def _getPilotOptionsPerSetup(self, setup, pilotDict): return queueOptionRes queuesDict[queue] = queueOptionRes["Value"] pilotDict["Setups"][setup]["Logging"]["Queues"] = queuesDict + elif "loggingRESTService" in pilotDict["Setups"][setup]: + self.log.debug( + "Getting option of ", "/DIRAC/Setups/%s/%s" % (setup, pilotDict["Setups"][setup]["loggingRESTService"]) + ) + result = gConfig.getOption( + "/DIRAC/Setups/%s/%s" % (setup, pilotDict["Setups"][setup]["loggingRESTService"]) + ) + if not result["OK"]: + return result + optValue = result["Value"] + self.log.debug("value: ", optValue) + tornadoService = gConfig.getOptionsDict( + "/Systems/%s/%s" % (pilotDict["Setups"][setup]["loggingRESTService"], optValue) + ) + if not tornadoService["OK"]: + self.log.error(tornadoService["Message"]) + return tornadoService + pilotDict["Setups"][setup]["Logging"] = {"LoggingType": "REST_API"} + pilotDict["Setups"][setup]["Logging"]["Port"] = tornadoService["Value"]["Port"] + # host ? os.environ.get('HOSTNAME') as a fallback ? + pilotDict["Setups"][setup]["Logging"]["Host"] = tornadoService["Value"].get( + "Host", os.environ.get("HOSTNAME") + ) def syncScripts(self): """Clone the pilot scripts from the Pilot repositories (handle also extensions)""" From 7d783299517fda3bb71a13ede4c070ffa313f5f1 Mon Sep 17 00:00:00 2001 From: martynia Date: Wed, 16 Mar 2022 14:14:17 +0100 Subject: [PATCH 2/7] feat: add log finaliser; use a named shifter from Defaults/Shifter --- .../Agent/PilotLoggingAgent.py | 147 ++++++++-------- .../Agent/SiteDirector.py | 7 +- .../Service/TornadoPilotLoggingHandler.py | 162 ++++++++++-------- 3 files changed, 171 insertions(+), 145 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py index 4babc44d462..66fc69aecb9 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py @@ -9,6 +9,7 @@ from __future__ import print_function import os +import json from DIRAC import S_OK, S_ERROR from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals @@ -21,82 +22,82 @@ __RCSID__ = "Id$" -class PilotLoggingAgent(AgentModule): - """ - .. class:: PilotLoggingAgent - - The agent sends completed pilot log files to permanent storage for analysis. - """ - def initialize(self): +class PilotLoggingAgent(AgentModule): """ - agent's initalisation + .. class:: PilotLoggingAgent - :param self: self reference - """ - # Determine the VO for shifter - # The PilotLoggingAgent might use a VO-specific shifter proxy. - self.vo = self.am_getOption("VO", '') - if not self.vo: - self.vo = self.am_getOption("Community", '') - if not self.vo: - self.vo = CSGlobals.getVO() - - # The SiteDirector is for a particular user group - #self.group = self.am_getOption("Group", '') - - # get shifter proxy for uploads - self.am_setOption('shifterProxy', 'DataManager') - self.uploadSE = self.am_getOption('UploadSE', 'UKI-LT2-IC-HEP-disk') - - self.message = self.am_getOption('Message', "PilotLoggingAgent initialised.") - self.log.info("message = %s" % self.message) - self.certAndKeyLocation = getHostCertificateAndKeyLocation() - self.casLocation = getCAsLocation() - - data = {'method': 'getMetadata'} - self.server = self.am_getOption("DownloadLocation", None) - - if not self.server: - return S_ERROR("No DownloadLocation set in the CS !") - try: - with requests.post(self.server, data=data, verify=self.casLocation, cert=self.certAndKeyLocation) as res: - if res.status_code not in (200, 202): - message = "Could not get metadata from %s: status %s" % (self.server, res.status_code) - self.log.error(message) - return S_ERROR(message) - resDict = res.json() - except Exception as exc: - message = "Call to server %s failed with %s " % (self.server, exc) - self.log.error(message) - return S_ERROR(message) - if resDict['OK']: - meta = resDict['Value'] - self.pilotLogPath = meta['LogPath'] - else: - return S_ERROR(resDict['Message']) - self.log.info("Pilot log files location = %s " % self.pilotLogPath) - return S_OK() - - def execute(self): - """ execution in one agent's cycle - - :param self: self reference + The agent sends completed pilot log files to permanent storage for analysis. """ - self.log.info("Pilot files upload cycle started.") - files = [f for f in os.listdir(self.pilotLogPath) - if os.path.isfile(os.path.join(self.pilotLogPath, f)) and f.endswith('log')] - for elem in files: - lfn = os.path.join('/gridpp/pilotlogs/', elem) - name = os.path.join(self.pilotLogPath, elem) - res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=self.uploadSE, overwrite=True) - if not res['OK']: - self.log.error("Could not upload", "to %s: %s" % (self.uploadSE, res['Message'])) - else: - self.log.info("File uploaded: ", "LFN = %s" % res['Value']) + def initialize(self): + """ + agent's initalisation. Use this agent's CS information to: + Determine what Defaults/Shifter shifter proxy to use., + get the target SE name from the CS. + Obtain log file location from Tornado. + + :param self: self reference + """ + + # get shifter proxy for uploads (VO-specific shifter from the Defaults CS section) + self.shifterName = self.am_getOption("ShifterName", "GridPPLogManager") + self.am_setOption("shifterProxy", self.shifterName) + self.uploadSE = self.am_getOption("UploadSE", "UKI-LT2-IC-HEP-disk") + + self.message = self.am_getOption("Message", "PilotLoggingAgent initialised.") + self.log.info("message = %s" % self.message) + self.certAndKeyLocation = getHostCertificateAndKeyLocation() + self.casLocation = getCAsLocation() + + data = {"method": "getMetadata"} + self.server = self.am_getOption("DownloadLocation", None) + + if not self.server: + return S_ERROR("No DownloadLocation set in the CS !") try: - os.remove(name) - except Exception as excp: - self.log.exception("Cannot remove a local file after uploading", lException=excp) - return S_OK() + with requests.post(self.server, data=data, verify=self.casLocation, cert=self.certAndKeyLocation) as res: + if res.status_code not in (200, 202): + message = "Could not get metadata from %s: status %s" % (self.server, res.status_code) + self.log.error(message) + return S_ERROR(message) + resDict = res.json() + except Exception as exc: + message = "Call to server %s failed with %s " % (self.server, exc) + self.log.error(message) + return S_ERROR(message) + if resDict["OK"]: + meta = resDict["Value"] + self.pilotLogPath = meta["LogPath"] + else: + return S_ERROR(resDict["Message"]) + self.log.info("Pilot log files location = %s " % self.pilotLogPath) + return S_OK() + + def execute(self): + """ + Execute one agent cycle. Upload log files to the SE and register them in the DFC. + + :param self: self reference + """ + + self.log.info("Pilot files upload cycle started.") + files = [ + f + for f in os.listdir(self.pilotLogPath) + if os.path.isfile(os.path.join(self.pilotLogPath, f)) and f.endswith("log") + ] + for elem in files: + lfn = os.path.join("/gridpp/pilotlogs/", elem) + name = os.path.join(self.pilotLogPath, elem) + res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=self.uploadSE, overwrite=True) + if not res["OK"]: + self.log.error("Could not upload", "to %s: %s" % (self.uploadSE, res["Message"])) + else: + self.log.info("File uploaded: ", "LFN = %s" % res["Value"]) + try: + pass + # os.remove(name) + except Exception as excp: + self.log.exception("Cannot remove a local file after uploading", lException=excp) + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 821072e4fb4..df7c255753c 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -988,14 +988,17 @@ def _getPilotOptions(self, queue, **kwargs): else: self.log.info("DIRAC project will be installed by pilots") - # Pilot Logging defined? + # Pilot Logging defined? This enables the extended (possibly remote) logger pilotLogging = opsHelper.getValue("/Services/JobMonitoring/usePilotsLoggingFlag", False) if pilotLogging: pilotOptions.append("-z ") + # internal extended logger logging to debug the logger itself. + extLoggingLevel = opsHelper.getValue("/Services/JobMonitoring/extLoggerLoggingLevel", "WARNING") + pilotOptions.append("-g %s" % extLoggingLevel) pilotOptions.append("--pythonVersion=3") - # Debug + # Debug. Both for the standard and (if enabled) extended logger. if self.pilotLogLevel.lower() == "debug": pilotOptions.append("-ddd") diff --git a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py index 6db0399d3b6..a2f99efe5ba 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py @@ -17,73 +17,95 @@ class TornadoPilotLoggingHandler(TornadoService): - log = sLog - - @classmethod - def initializeHandler(cls, infosDict): - """ - Called once, at the first request. Create a directory where pilot logs will be stored. - - :param infosDict: - :return: None - """ - - TornadoPilotLoggingHandler.log.info("Handler initialised ...") - cls.meta={} - logPath = os.path.join(os.getcwd(), "pilotlogs") - cls.meta['LogPath'] = logPath - if not os.path.exists(logPath): - os.makedirs(logPath) - TornadoPilotLoggingHandler.log.info("Pilot logging directory:", logPath) - - def initializeRequest(self): - """ - Called for each request. - - :return: None - """ - - self.log.info("Request initialised.. ") - - auth_sayHello = ['all'] - def export_sayHello(self): - ## Insert your method here, don't forget the return should be serializable - ## Returned value may be an S_OK/S_ERROR - ## You don't need to serialize in JSON, Tornado will do it - self.log.info('Hello...') - return S_OK('Hello!') - - auth_sendMessage = ['all'] - def export_sendMessage(self, message): - #def export_sendMessage(self, message, pilotUUID): - """ - The method logs messages to Tornado and writes pilot log files, one per pilot. - - :param message: message sent by a client, in JSON format - :param pilotUUID: pilot UUID - used to create a log file - :return: S_OK or S_ERROR if a file cannot be created or written to. - """ - ## Insert your method here, don't forget the return should be serializable - ## Returned value may be an S_OK/S_ERROR - ## You don't need to serialize in JSON, Tornado will do it - self.log.info('Message: ', message) - messageDict = json.loads(message) - pilotUUID = messageDict.get('pilotUUID', 'Unspecified_ID') - with open(os.path.join(TornadoPilotLoggingHandler.meta['LogPath'], pilotUUID), 'a') as pilotLog: - try: - pilotLog.write(message+'\n') - except IOError as ioerr: - self.log.error("Error writing to log file:", str(ioerr)) - return S_ERROR(str(ioerr)) - return S_OK('Message logged successfully for pilot: %s' % (pilotUUID,)) - - auth_getMetadata = ['all'] - def export_getMetadata(self): - """ - Get PilotLoggingHandler metadata. - - :return: S_OK containing a metadata dictionary - """ - if 'LogPath' in TornadoPilotLoggingHandler.meta: - return S_OK(TornadoPilotLoggingHandler.meta) - return S_ERROR("No Pilot logging directory defined") + log = sLog + + @classmethod + def initializeHandler(cls, infosDict): + """ + Called once, at the first request. Create a directory where pilot logs will be stored. + + :param infosDict: + :return: None + """ + + TornadoPilotLoggingHandler.log.info("Handler initialised ...") + cls.meta = {} + logPath = os.path.join(os.getcwd(), "pilotlogs") + cls.meta["LogPath"] = logPath + if not os.path.exists(logPath): + os.makedirs(logPath) + TornadoPilotLoggingHandler.log.info("Pilot logging directory:", logPath) + + def initializeRequest(self): + """ + Called for each request. + + :return: None + """ + + self.log.info("Request initialised.. ") + + auth_sayHello = ["all"] + + def export_sayHello(self): + ## Insert your method here, don't forget the return should be serializable + ## Returned value may be an S_OK/S_ERROR + ## You don't need to serialize in JSON, Tornado will do it + self.log.info("Hello...") + return S_OK("Hello!") + + auth_sendMessage = ["all"] + + def export_sendMessage(self, message): + # def export_sendMessage(self, message, pilotUUID): + """ + The method logs messages to Tornado and writes pilot log files, one per pilot. + + :param message: message sent by a client, in JSON format + :param pilotUUID: pilot UUID - used to create a log file + :return: S_OK or S_ERROR if a file cannot be created or written to. + """ + ## Insert your method here, don't forget the return should be serializable + ## Returned value may be an S_OK/S_ERROR + ## You don't need to serialize in JSON, Tornado will do it + self.log.info("Message: ", message) + messageDict = json.loads(message) + pilotUUID = messageDict.get("pilotUUID", "Unspecified_ID") + with open(os.path.join(TornadoPilotLoggingHandler.meta["LogPath"], pilotUUID), "a") as pilotLog: + try: + pilotLog.write(message + "\n") + except IOError as ioerr: + self.log.error("Error writing to log file:", str(ioerr)) + return S_ERROR(str(ioerr)) + return S_OK("Message logged successfully for pilot: %s" % (pilotUUID,)) + + auth_getMetadata = ["all"] + + def export_getMetadata(self): + """ + Get PilotLoggingHandler metadata. + + :return: S_OK containing a metadata dictionary + """ + if "LogPath" in TornadoPilotLoggingHandler.meta: + return S_OK(TornadoPilotLoggingHandler.meta) + return S_ERROR("No Pilot logging directory defined") + + auth_finaliseLogs = ["all"] + + def export_finaliseLogs(self, payload): + """ + Finalise a log file. Finalised logfile can be copied to a secure location. + + :param logfile: log filename + :type logfile: str + :return: S_OK or S_ERROR + :rtype: dict + """ + try: + logfile = json.loads(payload) + filepath = TornadoPilotLoggingHandler.meta["LogPath"] + os.rename(os.path.join(filepath, logfile), os.path.join(filepath, logfile + ".log")) + return S_OK("Log file finalised for pilot: %s " % (logfile,)) + except Exception as err: + return S_ERROR(str(err)) From b93be7a743b03e6c7c205841a75a28830ce9beb0 Mon Sep 17 00:00:00 2001 From: martynia Date: Wed, 23 Mar 2022 16:45:49 +0100 Subject: [PATCH 3/7] feat: Introduce pilot log file cache to be loaded from the handler. --- .../Service/BasicPilotLoggingPlugin.py | 51 +++++++++++++ .../Service/FileCacheLoggingPlugin.py | 76 +++++++++++++++++++ .../Service/TornadoPilotLoggingHandler.py | 63 +++++++-------- 3 files changed, 160 insertions(+), 30 deletions(-) create mode 100644 src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py create mode 100644 src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py diff --git a/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py new file mode 100644 index 00000000000..8452e0a0851 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py @@ -0,0 +1,51 @@ +""" +Basic Pilot logging plugin. Just log messages. +""" +from DIRAC import S_OK, S_ERROR, gLogger + +sLog = gLogger.getSubLogger(__name__) + + +class BasicPilotLoggingPlugin(object): + """ + This is a no-op fallback solution class, to be used when no plugin is defined for remote logging. + Any pilot logger plugin could inherit from this class to receive a set of no-op methods required by + :class:`TornadoPilotLoggingHandler` and only overwrite needed methods. + """ + + def __init__(self): + + sLog.warning("BasicPilotLoggingPlugin is being used. It only logs locally at a debug level.") + + def sendMessage(self, message): + """ + Dummy sendMessage method. + + :param message: text to log + :type message: str + :return: None + :rtype: None + """ + sLog.debug(message) + return S_OK("Message sent") + + def finaliseLogs(self, payload): + """ + Dummy finaliseLogs method. + + :param payload: + :type payload: + :return: S_OK or S_ERROR + :rtype: dict + """ + + return S_OK("Finaliser!") + + def getMeta(self): + """ + Get metadata dummy method. + + :return: S_OK with an empty dict + :rtype: dict + """ + return S_OK({}) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py new file mode 100644 index 00000000000..f6400ffec57 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py @@ -0,0 +1,76 @@ +""" +Basic Pilot logging plugin. Just log messages. +""" +import os, json +from DIRAC import S_OK, S_ERROR, gLogger + +sLog = gLogger.getSubLogger(__name__) + + +class FileCacheLoggingPlugin(object): + """ + File cache logging. Log records are appended to a file, one for each pilot. + It is assumed that an agent will be installed together with this plugin, which will copy + the files to a safe place and clear the cache. + """ + + def __init__(self): + """ + Sets the pilot log files location for a WebServer. + + """ + self.meta = {} + logPath = os.path.join(os.getcwd(), "pilotlogs") + self.meta["LogPath"] = logPath + if not os.path.exists(logPath): + os.makedirs(logPath) + sLog.info("Pilot logging directory:", logPath) + + def sendMessage(self, message): + """ + File cache sendMessage method. + + :param message: text to log + :type message: str + :return: None + :rtype: None + """ + sLog.info(message) + messageDict = json.loads(message) + pilotUUID = messageDict.get("pilotUUID", "Unspecified_ID") + with open(os.path.join(self.meta["LogPath"], pilotUUID), "a") as pilotLog: + try: + pilotLog.write(message + "\n") + except IOError as ioerr: + self.log.error("Error writing to log file:", str(ioerr)) + return S_ERROR(str(ioerr)) + return S_OK("Message logged successfully for pilot: %s" % (pilotUUID,)) + + def finaliseLogs(self, payload): + """ + Finalise a log file. Finalised logfile can be copied to a secure location. + + :param logfile: log filename + :type logfile: str + :return: S_OK or S_ERROR + :rtype: dict + """ + + try: + logfile = json.loads(payload) + filepath = self.meta["LogPath"] + os.rename(os.path.join(filepath, logfile), os.path.join(filepath, logfile + ".log")) + return S_OK("Log file finalised for pilot: %s " % (logfile,)) + except Exception as err: + return S_ERROR(str(err)) + + def getMeta(self): + """ + Return any metadata related to this plugin. The "LogPath" is the minimum requirement for the dict to contain. + + :return: Dirac S_OK containing the metadata or S_ERROR if the LogPath is not defined. + :rtype: dict + """ + if "LogPath" in self.meta: + return S_OK(self.meta) + return S_ERROR("No Pilot logging directory defined") diff --git a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py index a2f99efe5ba..f2087e59253 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py @@ -8,10 +8,11 @@ __RCSID__ = "$Id$" -import os -import json +import os, json from DIRAC import gLogger, S_OK, S_ERROR from DIRAC.Core.Tornado.Server.TornadoService import TornadoService +from DIRAC.Core.DISET.RequestHandler import RequestHandler, getServiceOption +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader sLog = gLogger.getSubLogger(__name__) @@ -20,21 +21,34 @@ class TornadoPilotLoggingHandler(TornadoService): log = sLog @classmethod - def initializeHandler(cls, infosDict): + def initializeHandler(cls, infoDict): """ Called once, at the first request. Create a directory where pilot logs will be stored. - :param infosDict: + :param infoDict: :return: None """ - TornadoPilotLoggingHandler.log.info("Handler initialised ...") + cls.log.info("Handler initialised ...") + cls.log.debug("with a dict: ", str(infoDict)) + defaultOption, defaultClass = "LoggingPlugin", "BasicPilotLoggingPlugin" + configValue = getServiceOption(infoDict, defaultOption, defaultClass) + + result = ObjectLoader().loadObject("WorkloadManagementSystem.Service.%s" % (configValue,), configValue) + if not result["OK"]: + cls.log.error("Failed to load LoggingPlugin", "%s: %s" % (configValue, result["Message"])) + return result + + componentClass = result["Value"] + cls.loggingPlugin = componentClass() + cls.log.info("Loaded: PilotLoggingPlugin class", configValue) + cls.meta = {} logPath = os.path.join(os.getcwd(), "pilotlogs") cls.meta["LogPath"] = logPath if not os.path.exists(logPath): os.makedirs(logPath) - TornadoPilotLoggingHandler.log.info("Pilot logging directory:", logPath) + cls.log.info("Pilot logging directory:", logPath) def initializeRequest(self): """ @@ -69,43 +83,32 @@ def export_sendMessage(self, message): ## Returned value may be an S_OK/S_ERROR ## You don't need to serialize in JSON, Tornado will do it self.log.info("Message: ", message) - messageDict = json.loads(message) - pilotUUID = messageDict.get("pilotUUID", "Unspecified_ID") - with open(os.path.join(TornadoPilotLoggingHandler.meta["LogPath"], pilotUUID), "a") as pilotLog: - try: - pilotLog.write(message + "\n") - except IOError as ioerr: - self.log.error("Error writing to log file:", str(ioerr)) - return S_ERROR(str(ioerr)) - return S_OK("Message logged successfully for pilot: %s" % (pilotUUID,)) + # the plugin returns S_OK or S_ERROR + result = self.loggingPlugin.sendMessage(message) + return result auth_getMetadata = ["all"] def export_getMetadata(self): """ - Get PilotLoggingHandler metadata. + Get PilotLoggingHandler metadata. Intended to be used by a client or an agent. :return: S_OK containing a metadata dictionary """ - if "LogPath" in TornadoPilotLoggingHandler.meta: - return S_OK(TornadoPilotLoggingHandler.meta) - return S_ERROR("No Pilot logging directory defined") + return self.loggingPlugin.getMeta() auth_finaliseLogs = ["all"] def export_finaliseLogs(self, payload): """ - Finalise a log file. Finalised logfile can be copied to a secure location. + Finalise a log file. Finalised logfile can be copied to a secure location. if a file cache is used. - :param logfile: log filename - :type logfile: str - :return: S_OK or S_ERROR + :param payload: data passed to the plugin finaliser, a string in the file cache plugin. + :type payload: str or dict + :return: S_OK or S_ERROR (via the plugin involved) :rtype: dict """ - try: - logfile = json.loads(payload) - filepath = TornadoPilotLoggingHandler.meta["LogPath"] - os.rename(os.path.join(filepath, logfile), os.path.join(filepath, logfile + ".log")) - return S_OK("Log file finalised for pilot: %s " % (logfile,)) - except Exception as err: - return S_ERROR(str(err)) + + # The plugin returns the Dirac S_OK or S_ERROR object + + return self.loggingPlugin.finaliseLogs(payload) From 963ae7486e2c49735ff2544d9360e805e8ad352f Mon Sep 17 00:00:00 2001 From: martynia Date: Fri, 25 Mar 2022 15:01:48 +0100 Subject: [PATCH 4/7] fix: remove future and other unused imports. use log.exception --- .../Agent/PilotLoggingAgent.py | 26 +++++-------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py index 66fc69aecb9..e3bf17b55aa 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py @@ -4,23 +4,11 @@ """ # # imports -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os -import json - +import os, requests from DIRAC import S_OK, S_ERROR -from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals from DIRAC.Core.Base.AgentModule import AgentModule -from DIRAC.Core.DISET.RPCClient import RPCClient from DIRAC.Core.Security.Locations import getHostCertificateAndKeyLocation, getCAsLocation from DIRAC.DataManagementSystem.Client.DataManager import DataManager -from DIRAC.WorkloadManagementSystem.Service.TornadoPilotLoggingHandler import TornadoPilotLoggingHandler -import requests - -__RCSID__ = "Id$" class PilotLoggingAgent(AgentModule): @@ -45,10 +33,8 @@ def initialize(self): self.am_setOption("shifterProxy", self.shifterName) self.uploadSE = self.am_getOption("UploadSE", "UKI-LT2-IC-HEP-disk") - self.message = self.am_getOption("Message", "PilotLoggingAgent initialised.") - self.log.info("message = %s" % self.message) - self.certAndKeyLocation = getHostCertificateAndKeyLocation() - self.casLocation = getCAsLocation() + certAndKeyLocation = getHostCertificateAndKeyLocation() + casLocation = getCAsLocation() data = {"method": "getMetadata"} self.server = self.am_getOption("DownloadLocation", None) @@ -56,15 +42,15 @@ def initialize(self): if not self.server: return S_ERROR("No DownloadLocation set in the CS !") try: - with requests.post(self.server, data=data, verify=self.casLocation, cert=self.certAndKeyLocation) as res: + with requests.post(self.server, data=data, verify=casLocation, cert=certAndKeyLocation) as res: if res.status_code not in (200, 202): message = "Could not get metadata from %s: status %s" % (self.server, res.status_code) self.log.error(message) return S_ERROR(message) resDict = res.json() except Exception as exc: - message = "Call to server %s failed with %s " % (self.server, exc) - self.log.error(message) + message = "Call to server %s failed" % (self.server,) + self.log.exception(message, lException=exc) return S_ERROR(message) if resDict["OK"]: meta = resDict["Value"] From 4b92019c6215b85199029aa74b839853ad19f737 Mon Sep 17 00:00:00 2001 From: martynia Date: Fri, 25 Mar 2022 15:10:20 +0100 Subject: [PATCH 5/7] feat: add PilotLoggingAgent definition --- src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index 15d023e9114..c493b0534c7 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -198,6 +198,13 @@ Agents PilotAccountingEnabled = yes } ##END + ##BEGIN PilotLoggingAgent + PilotLoggingAgent + { + PollingTime = 600 + LogLevel = DEBUG + } + ##END JobAgent { FillingModeFlag = true From 44c18089c00d3cef5bebb5326c92a7d90699de8a Mon Sep 17 00:00:00 2001 From: martynia Date: Fri, 25 Mar 2022 15:13:34 +0100 Subject: [PATCH 6/7] feat: restrict authorisation to Pilot, GenericPilot and Operator properties. --- .../Service/TornadoPilotLoggingHandler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py index f2087e59253..cf0bb3bcff5 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py @@ -59,7 +59,7 @@ def initializeRequest(self): self.log.info("Request initialised.. ") - auth_sayHello = ["all"] + auth_sayHello = ["Operator"] def export_sayHello(self): ## Insert your method here, don't forget the return should be serializable @@ -68,7 +68,7 @@ def export_sayHello(self): self.log.info("Hello...") return S_OK("Hello!") - auth_sendMessage = ["all"] + auth_sendMessage = ["Operator", "Pilot", "GenericPilot"] def export_sendMessage(self, message): # def export_sendMessage(self, message, pilotUUID): @@ -87,7 +87,7 @@ def export_sendMessage(self, message): result = self.loggingPlugin.sendMessage(message) return result - auth_getMetadata = ["all"] + auth_getMetadata = ["Operator", "TrustedHost"] def export_getMetadata(self): """ @@ -97,7 +97,7 @@ def export_getMetadata(self): """ return self.loggingPlugin.getMeta() - auth_finaliseLogs = ["all"] + auth_finaliseLogs = ["Operator", "Pilot", "GenericPilot"] def export_finaliseLogs(self, payload): """ From f4f2613a0c5518088b55c44e1c56076680ef55df Mon Sep 17 00:00:00 2001 From: martynia Date: Fri, 25 Mar 2022 15:15:09 +0100 Subject: [PATCH 7/7] feat: add regex pilotUUID verification --- .../Service/FileCacheLoggingPlugin.py | 25 ++++++++++++++----- .../Service/TornadoPilotLoggingHandler.py | 9 ------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py index f6400ffec57..ea19624b20a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py @@ -1,7 +1,7 @@ """ Basic Pilot logging plugin. Just log messages. """ -import os, json +import os, json, re from DIRAC import S_OK, S_ERROR, gLogger sLog = gLogger.getSubLogger(__name__) @@ -19,6 +19,8 @@ def __init__(self): Sets the pilot log files location for a WebServer. """ + # UUID pattern + self.pattern = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") self.meta = {} logPath = os.path.join(os.getcwd(), "pilotlogs") self.meta["LogPath"] = logPath @@ -38,11 +40,17 @@ def sendMessage(self, message): sLog.info(message) messageDict = json.loads(message) pilotUUID = messageDict.get("pilotUUID", "Unspecified_ID") + + res = self.pattern.match(pilotUUID) + if not res: + sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (pilotUUID,)) + return S_ERROR("Pilot UUID is invalid") + with open(os.path.join(self.meta["LogPath"], pilotUUID), "a") as pilotLog: try: pilotLog.write(message + "\n") except IOError as ioerr: - self.log.error("Error writing to log file:", str(ioerr)) + sLog.error("Error writing to log file:", str(ioerr)) return S_ERROR(str(ioerr)) return S_OK("Message logged successfully for pilot: %s" % (pilotUUID,)) @@ -50,17 +58,22 @@ def finaliseLogs(self, payload): """ Finalise a log file. Finalised logfile can be copied to a secure location. - :param logfile: log filename - :type logfile: str + :param logfile: payload containing log filename. + :type logfile: json representation of dict :return: S_OK or S_ERROR :rtype: dict """ + logfile = json.loads(payload).get("pilotUUID", "Unspecified_ID") + res = self.pattern.match(logfile) + if not res: + sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (logfile,)) + return S_ERROR("Pilot UUID is invalid") + try: - logfile = json.loads(payload) filepath = self.meta["LogPath"] os.rename(os.path.join(filepath, logfile), os.path.join(filepath, logfile + ".log")) - return S_OK("Log file finalised for pilot: %s " % (logfile,)) + return S_OK("Log file finalised for pilot: %s" % (logfile,)) except Exception as err: return S_ERROR(str(err)) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py index cf0bb3bcff5..b115144b274 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py @@ -59,15 +59,6 @@ def initializeRequest(self): self.log.info("Request initialised.. ") - auth_sayHello = ["Operator"] - - def export_sayHello(self): - ## Insert your method here, don't forget the return should be serializable - ## Returned value may be an S_OK/S_ERROR - ## You don't need to serialize in JSON, Tornado will do it - self.log.info("Hello...") - return S_OK("Hello!") - auth_sendMessage = ["Operator", "Pilot", "GenericPilot"] def export_sendMessage(self, message):