diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py new file mode 100644 index 00000000000..e3bf17b55aa --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py @@ -0,0 +1,89 @@ +""" :mod: PilotLoggingAgent + + PilotLoggingAgent sends Pilot log files to an SE +""" + +# # imports +import os, requests +from DIRAC import S_OK, S_ERROR +from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Security.Locations import getHostCertificateAndKeyLocation, getCAsLocation +from DIRAC.DataManagementSystem.Client.DataManager import DataManager + + +class PilotLoggingAgent(AgentModule): + """ + .. class:: PilotLoggingAgent + + The agent sends completed pilot log files to permanent storage for analysis. + """ + + def initialize(self): + """ + agent's initalisation. Use this agent's CS information to: + Determine what Defaults/Shifter shifter proxy to use., + get the target SE name from the CS. + Obtain log file location from Tornado. + + :param self: self reference + """ + + # get shifter proxy for uploads (VO-specific shifter from the Defaults CS section) + self.shifterName = self.am_getOption("ShifterName", "GridPPLogManager") + self.am_setOption("shifterProxy", self.shifterName) + self.uploadSE = self.am_getOption("UploadSE", "UKI-LT2-IC-HEP-disk") + + certAndKeyLocation = getHostCertificateAndKeyLocation() + casLocation = getCAsLocation() + + data = {"method": "getMetadata"} + self.server = self.am_getOption("DownloadLocation", None) + + if not self.server: + return S_ERROR("No DownloadLocation set in the CS !") + try: + with requests.post(self.server, data=data, verify=casLocation, cert=certAndKeyLocation) as res: + if res.status_code not in (200, 202): + message = "Could not get metadata from %s: status %s" % (self.server, res.status_code) + self.log.error(message) + return S_ERROR(message) + resDict = res.json() + except Exception as exc: + message = "Call to server %s failed" % (self.server,) + self.log.exception(message, lException=exc) + return S_ERROR(message) + if resDict["OK"]: + meta = resDict["Value"] + self.pilotLogPath = meta["LogPath"] + else: + return S_ERROR(resDict["Message"]) + self.log.info("Pilot log files location = %s " % self.pilotLogPath) + return S_OK() + + def execute(self): + """ + Execute one agent cycle. Upload log files to the SE and register them in the DFC. + + :param self: self reference + """ + + self.log.info("Pilot files upload cycle started.") + files = [ + f + for f in os.listdir(self.pilotLogPath) + if os.path.isfile(os.path.join(self.pilotLogPath, f)) and f.endswith("log") + ] + for elem in files: + lfn = os.path.join("/gridpp/pilotlogs/", elem) + name = os.path.join(self.pilotLogPath, elem) + res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=self.uploadSE, overwrite=True) + if not res["OK"]: + self.log.error("Could not upload", "to %s: %s" % (self.uploadSE, res["Message"])) + else: + self.log.info("File uploaded: ", "LFN = %s" % res["Value"]) + try: + pass + # os.remove(name) + except Exception as excp: + self.log.exception("Cannot remove a local file after uploading", lException=excp) + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 821072e4fb4..df7c255753c 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -988,14 +988,17 @@ def _getPilotOptions(self, queue, **kwargs): else: self.log.info("DIRAC project will be installed by pilots") - # Pilot Logging defined? + # Pilot Logging defined? This enables the extended (possibly remote) logger pilotLogging = opsHelper.getValue("/Services/JobMonitoring/usePilotsLoggingFlag", False) if pilotLogging: pilotOptions.append("-z ") + # internal extended logger logging to debug the logger itself. + extLoggingLevel = opsHelper.getValue("/Services/JobMonitoring/extLoggerLoggingLevel", "WARNING") + pilotOptions.append("-g %s" % extLoggingLevel) pilotOptions.append("--pythonVersion=3") - # Debug + # Debug. Both for the standard and (if enabled) extended logger. if self.pilotLogLevel.lower() == "debug": pilotOptions.append("-ddd") diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index eedaf4a23f8..c493b0534c7 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -19,6 +19,14 @@ Services } } ##END + TornadoPilotLogging + { + Protocol = https + Authorization + { + Default = authenticated + } + } JobMonitoring { Port = 9130 @@ -190,6 +198,13 @@ Agents PilotAccountingEnabled = yes } ##END + ##BEGIN PilotLoggingAgent + PilotLoggingAgent + { + PollingTime = 600 + LogLevel = DEBUG + } + ##END JobAgent { FillingModeFlag = true diff --git a/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py new file mode 100644 index 00000000000..8452e0a0851 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py @@ -0,0 +1,51 @@ +""" +Basic Pilot logging plugin. Just log messages. +""" +from DIRAC import S_OK, S_ERROR, gLogger + +sLog = gLogger.getSubLogger(__name__) + + +class BasicPilotLoggingPlugin(object): + """ + This is a no-op fallback solution class, to be used when no plugin is defined for remote logging. + Any pilot logger plugin could inherit from this class to receive a set of no-op methods required by + :class:`TornadoPilotLoggingHandler` and only overwrite needed methods. + """ + + def __init__(self): + + sLog.warning("BasicPilotLoggingPlugin is being used. It only logs locally at a debug level.") + + def sendMessage(self, message): + """ + Dummy sendMessage method. + + :param message: text to log + :type message: str + :return: None + :rtype: None + """ + sLog.debug(message) + return S_OK("Message sent") + + def finaliseLogs(self, payload): + """ + Dummy finaliseLogs method. + + :param payload: + :type payload: + :return: S_OK or S_ERROR + :rtype: dict + """ + + return S_OK("Finaliser!") + + def getMeta(self): + """ + Get metadata dummy method. + + :return: S_OK with an empty dict + :rtype: dict + """ + return S_OK({}) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py new file mode 100644 index 00000000000..ea19624b20a --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py @@ -0,0 +1,89 @@ +""" +Basic Pilot logging plugin. Just log messages. +""" +import os, json, re +from DIRAC import S_OK, S_ERROR, gLogger + +sLog = gLogger.getSubLogger(__name__) + + +class FileCacheLoggingPlugin(object): + """ + File cache logging. Log records are appended to a file, one for each pilot. + It is assumed that an agent will be installed together with this plugin, which will copy + the files to a safe place and clear the cache. + """ + + def __init__(self): + """ + Sets the pilot log files location for a WebServer. + + """ + # UUID pattern + self.pattern = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + self.meta = {} + logPath = os.path.join(os.getcwd(), "pilotlogs") + self.meta["LogPath"] = logPath + if not os.path.exists(logPath): + os.makedirs(logPath) + sLog.info("Pilot logging directory:", logPath) + + def sendMessage(self, message): + """ + File cache sendMessage method. + + :param message: text to log + :type message: str + :return: None + :rtype: None + """ + sLog.info(message) + messageDict = json.loads(message) + pilotUUID = messageDict.get("pilotUUID", "Unspecified_ID") + + res = self.pattern.match(pilotUUID) + if not res: + sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (pilotUUID,)) + return S_ERROR("Pilot UUID is invalid") + + with open(os.path.join(self.meta["LogPath"], pilotUUID), "a") as pilotLog: + try: + pilotLog.write(message + "\n") + except IOError as ioerr: + sLog.error("Error writing to log file:", str(ioerr)) + return S_ERROR(str(ioerr)) + return S_OK("Message logged successfully for pilot: %s" % (pilotUUID,)) + + def finaliseLogs(self, payload): + """ + Finalise a log file. Finalised logfile can be copied to a secure location. + + :param logfile: payload containing log filename. + :type logfile: json representation of dict + :return: S_OK or S_ERROR + :rtype: dict + """ + + logfile = json.loads(payload).get("pilotUUID", "Unspecified_ID") + res = self.pattern.match(logfile) + if not res: + sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (logfile,)) + return S_ERROR("Pilot UUID is invalid") + + try: + filepath = self.meta["LogPath"] + os.rename(os.path.join(filepath, logfile), os.path.join(filepath, logfile + ".log")) + return S_OK("Log file finalised for pilot: %s" % (logfile,)) + except Exception as err: + return S_ERROR(str(err)) + + def getMeta(self): + """ + Return any metadata related to this plugin. The "LogPath" is the minimum requirement for the dict to contain. + + :return: Dirac S_OK containing the metadata or S_ERROR if the LogPath is not defined. + :rtype: dict + """ + if "LogPath" in self.meta: + return S_OK(self.meta) + return S_ERROR("No Pilot logging directory defined") diff --git a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py new file mode 100644 index 00000000000..b115144b274 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py @@ -0,0 +1,105 @@ +""" Tornado-based HTTPs JobMonitoring service. +""" + + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +__RCSID__ = "$Id$" + +import os, json +from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC.Core.Tornado.Server.TornadoService import TornadoService +from DIRAC.Core.DISET.RequestHandler import RequestHandler, getServiceOption +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader + +sLog = gLogger.getSubLogger(__name__) + + +class TornadoPilotLoggingHandler(TornadoService): + log = sLog + + @classmethod + def initializeHandler(cls, infoDict): + """ + Called once, at the first request. Create a directory where pilot logs will be stored. + + :param infoDict: + :return: None + """ + + cls.log.info("Handler initialised ...") + cls.log.debug("with a dict: ", str(infoDict)) + defaultOption, defaultClass = "LoggingPlugin", "BasicPilotLoggingPlugin" + configValue = getServiceOption(infoDict, defaultOption, defaultClass) + + result = ObjectLoader().loadObject("WorkloadManagementSystem.Service.%s" % (configValue,), configValue) + if not result["OK"]: + cls.log.error("Failed to load LoggingPlugin", "%s: %s" % (configValue, result["Message"])) + return result + + componentClass = result["Value"] + cls.loggingPlugin = componentClass() + cls.log.info("Loaded: PilotLoggingPlugin class", configValue) + + cls.meta = {} + logPath = os.path.join(os.getcwd(), "pilotlogs") + cls.meta["LogPath"] = logPath + if not os.path.exists(logPath): + os.makedirs(logPath) + cls.log.info("Pilot logging directory:", logPath) + + def initializeRequest(self): + """ + Called for each request. + + :return: None + """ + + self.log.info("Request initialised.. ") + + auth_sendMessage = ["Operator", "Pilot", "GenericPilot"] + + def export_sendMessage(self, message): + # def export_sendMessage(self, message, pilotUUID): + """ + The method logs messages to Tornado and writes pilot log files, one per pilot. + + :param message: message sent by a client, in JSON format + :param pilotUUID: pilot UUID - used to create a log file + :return: S_OK or S_ERROR if a file cannot be created or written to. + """ + ## Insert your method here, don't forget the return should be serializable + ## Returned value may be an S_OK/S_ERROR + ## You don't need to serialize in JSON, Tornado will do it + self.log.info("Message: ", message) + # the plugin returns S_OK or S_ERROR + result = self.loggingPlugin.sendMessage(message) + return result + + auth_getMetadata = ["Operator", "TrustedHost"] + + def export_getMetadata(self): + """ + Get PilotLoggingHandler metadata. Intended to be used by a client or an agent. + + :return: S_OK containing a metadata dictionary + """ + return self.loggingPlugin.getMeta() + + auth_finaliseLogs = ["Operator", "Pilot", "GenericPilot"] + + def export_finaliseLogs(self, payload): + """ + Finalise a log file. Finalised logfile can be copied to a secure location. if a file cache is used. + + :param payload: data passed to the plugin finaliser, a string in the file cache plugin. + :type payload: str or dict + :return: S_OK or S_ERROR (via the plugin involved) + :rtype: dict + """ + + # The plugin returns the Dirac S_OK or S_ERROR object + + return self.loggingPlugin.finaliseLogs(payload) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py index efa74fd8fd3..4a3e28db75d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py @@ -244,6 +244,29 @@ def _getPilotOptionsPerSetup(self, setup, pilotDict): return queueOptionRes queuesDict[queue] = queueOptionRes["Value"] pilotDict["Setups"][setup]["Logging"]["Queues"] = queuesDict + elif "loggingRESTService" in pilotDict["Setups"][setup]: + self.log.debug( + "Getting option of ", "/DIRAC/Setups/%s/%s" % (setup, pilotDict["Setups"][setup]["loggingRESTService"]) + ) + result = gConfig.getOption( + "/DIRAC/Setups/%s/%s" % (setup, pilotDict["Setups"][setup]["loggingRESTService"]) + ) + if not result["OK"]: + return result + optValue = result["Value"] + self.log.debug("value: ", optValue) + tornadoService = gConfig.getOptionsDict( + "/Systems/%s/%s" % (pilotDict["Setups"][setup]["loggingRESTService"], optValue) + ) + if not tornadoService["OK"]: + self.log.error(tornadoService["Message"]) + return tornadoService + pilotDict["Setups"][setup]["Logging"] = {"LoggingType": "REST_API"} + pilotDict["Setups"][setup]["Logging"]["Port"] = tornadoService["Value"]["Port"] + # host ? os.environ.get('HOSTNAME') as a fallback ? + pilotDict["Setups"][setup]["Logging"]["Host"] = tornadoService["Value"].get( + "Host", os.environ.get("HOSTNAME") + ) def syncScripts(self): """Clone the pilot scripts from the Pilot repositories (handle also extensions)"""