Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
""" :mod: PilotLoggingAgent
Comment thread
martynia marked this conversation as resolved.

PilotLoggingAgent sends Pilot log files to an SE
"""

# # imports
import os, requests
from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Security.Locations import getHostCertificateAndKeyLocation, getCAsLocation
from DIRAC.DataManagementSystem.Client.DataManager import DataManager


class PilotLoggingAgent(AgentModule):
"""
.. class:: PilotLoggingAgent

The agent sends completed pilot log files to permanent storage for analysis.
"""

def initialize(self):
"""
agent's initalisation. Use this agent's CS information to:
Determine what Defaults/Shifter shifter proxy to use.,
get the target SE name from the CS.
Obtain log file location from Tornado.

:param self: self reference
"""

# get shifter proxy for uploads (VO-specific shifter from the Defaults CS section)
self.shifterName = self.am_getOption("ShifterName", "GridPPLogManager")
self.am_setOption("shifterProxy", self.shifterName)
self.uploadSE = self.am_getOption("UploadSE", "UKI-LT2-IC-HEP-disk")
Comment thread
martynia marked this conversation as resolved.

certAndKeyLocation = getHostCertificateAndKeyLocation()
casLocation = getCAsLocation()

data = {"method": "getMetadata"}
self.server = self.am_getOption("DownloadLocation", None)

if not self.server:
return S_ERROR("No DownloadLocation set in the CS !")
try:
with requests.post(self.server, data=data, verify=casLocation, cert=certAndKeyLocation) as res:
if res.status_code not in (200, 202):
message = "Could not get metadata from %s: status %s" % (self.server, res.status_code)
self.log.error(message)
return S_ERROR(message)
resDict = res.json()
except Exception as exc:
message = "Call to server %s failed" % (self.server,)
self.log.exception(message, lException=exc)
return S_ERROR(message)
if resDict["OK"]:
meta = resDict["Value"]
self.pilotLogPath = meta["LogPath"]
else:
return S_ERROR(resDict["Message"])
self.log.info("Pilot log files location = %s " % self.pilotLogPath)
return S_OK()

def execute(self):
"""
Execute one agent cycle. Upload log files to the SE and register them in the DFC.

:param self: self reference
"""

self.log.info("Pilot files upload cycle started.")
files = [
f
for f in os.listdir(self.pilotLogPath)
if os.path.isfile(os.path.join(self.pilotLogPath, f)) and f.endswith("log")
]
for elem in files:
lfn = os.path.join("/gridpp/pilotlogs/", elem)
name = os.path.join(self.pilotLogPath, elem)
res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=self.uploadSE, overwrite=True)
if not res["OK"]:
self.log.error("Could not upload", "to %s: %s" % (self.uploadSE, res["Message"]))
else:
self.log.info("File uploaded: ", "LFN = %s" % res["Value"])
try:
pass
# os.remove(name)
except Exception as excp:
self.log.exception("Cannot remove a local file after uploading", lException=excp)
return S_OK()
7 changes: 5 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,14 +988,17 @@ def _getPilotOptions(self, queue, **kwargs):
else:
self.log.info("DIRAC project will be installed by pilots")

# Pilot Logging defined?
# Pilot Logging defined? This enables the extended (possibly remote) logger
pilotLogging = opsHelper.getValue("/Services/JobMonitoring/usePilotsLoggingFlag", False)
if pilotLogging:
pilotOptions.append("-z ")
# internal extended logger logging to debug the logger itself.
extLoggingLevel = opsHelper.getValue("/Services/JobMonitoring/extLoggerLoggingLevel", "WARNING")
pilotOptions.append("-g %s" % extLoggingLevel)

pilotOptions.append("--pythonVersion=3")

# Debug
# Debug. Both for the standard and (if enabled) extended logger.
if self.pilotLogLevel.lower() == "debug":
pilotOptions.append("-ddd")

Expand Down
15 changes: 15 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ Services
}
}
##END
TornadoPilotLogging

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an old DISET PilotLogging service that I would just remove (it was never used)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't removed it yet.

{
Protocol = https
Authorization
{
Default = authenticated
}
}
JobMonitoring
{
Port = 9130
Expand Down Expand Up @@ -190,6 +198,13 @@ Agents
PilotAccountingEnabled = yes
}
##END
##BEGIN PilotLoggingAgent
PilotLoggingAgent
{
PollingTime = 600
LogLevel = DEBUG
}
##END
JobAgent
{
FillingModeFlag = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Basic Pilot logging plugin. Just log messages.
"""
from DIRAC import S_OK, S_ERROR, gLogger

sLog = gLogger.getSubLogger(__name__)


class BasicPilotLoggingPlugin(object):
"""
This is a no-op fallback solution class, to be used when no plugin is defined for remote logging.
Any pilot logger plugin could inherit from this class to receive a set of no-op methods required by
:class:`TornadoPilotLoggingHandler` and only overwrite needed methods.
"""

def __init__(self):

sLog.warning("BasicPilotLoggingPlugin is being used. It only logs locally at a debug level.")

def sendMessage(self, message):
"""
Dummy sendMessage method.

:param message: text to log
:type message: str
:return: None
:rtype: None
"""
sLog.debug(message)
return S_OK("Message sent")

def finaliseLogs(self, payload):
"""
Dummy finaliseLogs method.

:param payload:
:type payload:
:return: S_OK or S_ERROR
:rtype: dict
"""

return S_OK("Finaliser!")

def getMeta(self):
"""
Get metadata dummy method.

:return: S_OK with an empty dict
:rtype: dict
"""
return S_OK({})
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""
Basic Pilot logging plugin. Just log messages.
"""
import os, json, re
from DIRAC import S_OK, S_ERROR, gLogger

sLog = gLogger.getSubLogger(__name__)


class FileCacheLoggingPlugin(object):
"""
File cache logging. Log records are appended to a file, one for each pilot.
It is assumed that an agent will be installed together with this plugin, which will copy
the files to a safe place and clear the cache.
"""

def __init__(self):
"""
Sets the pilot log files location for a WebServer.

"""
# UUID pattern
self.pattern = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$")
self.meta = {}
logPath = os.path.join(os.getcwd(), "pilotlogs")
self.meta["LogPath"] = logPath
if not os.path.exists(logPath):
os.makedirs(logPath)
sLog.info("Pilot logging directory:", logPath)

def sendMessage(self, message):
"""
File cache sendMessage method.

:param message: text to log
:type message: str
:return: None
:rtype: None
"""
sLog.info(message)
messageDict = json.loads(message)
pilotUUID = messageDict.get("pilotUUID", "Unspecified_ID")

res = self.pattern.match(pilotUUID)
if not res:
sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (pilotUUID,))
return S_ERROR("Pilot UUID is invalid")

with open(os.path.join(self.meta["LogPath"], pilotUUID), "a") as pilotLog:
try:
pilotLog.write(message + "\n")
except IOError as ioerr:
sLog.error("Error writing to log file:", str(ioerr))
return S_ERROR(str(ioerr))
return S_OK("Message logged successfully for pilot: %s" % (pilotUUID,))

def finaliseLogs(self, payload):
"""
Finalise a log file. Finalised logfile can be copied to a secure location.

:param logfile: payload containing log filename.
:type logfile: json representation of dict
:return: S_OK or S_ERROR
:rtype: dict
"""

logfile = json.loads(payload).get("pilotUUID", "Unspecified_ID")
res = self.pattern.match(logfile)
if not res:
sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (logfile,))
return S_ERROR("Pilot UUID is invalid")

try:
filepath = self.meta["LogPath"]
os.rename(os.path.join(filepath, logfile), os.path.join(filepath, logfile + ".log"))
return S_OK("Log file finalised for pilot: %s" % (logfile,))
except Exception as err:
return S_ERROR(str(err))

def getMeta(self):
"""
Return any metadata related to this plugin. The "LogPath" is the minimum requirement for the dict to contain.

:return: Dirac S_OK containing the metadata or S_ERROR if the LogPath is not defined.
:rtype: dict
"""
if "LogPath" in self.meta:
return S_OK(self.meta)
return S_ERROR("No Pilot logging directory defined")
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
""" Tornado-based HTTPs JobMonitoring service.
"""


from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

__RCSID__ = "$Id$"

import os, json
from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Core.Tornado.Server.TornadoService import TornadoService
from DIRAC.Core.DISET.RequestHandler import RequestHandler, getServiceOption
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader

sLog = gLogger.getSubLogger(__name__)


class TornadoPilotLoggingHandler(TornadoService):
log = sLog

@classmethod
def initializeHandler(cls, infoDict):
"""
Called once, at the first request. Create a directory where pilot logs will be stored.

:param infoDict:
:return: None
"""

cls.log.info("Handler initialised ...")
cls.log.debug("with a dict: ", str(infoDict))
defaultOption, defaultClass = "LoggingPlugin", "BasicPilotLoggingPlugin"
configValue = getServiceOption(infoDict, defaultOption, defaultClass)

result = ObjectLoader().loadObject("WorkloadManagementSystem.Service.%s" % (configValue,), configValue)
if not result["OK"]:
cls.log.error("Failed to load LoggingPlugin", "%s: %s" % (configValue, result["Message"]))
return result

componentClass = result["Value"]
cls.loggingPlugin = componentClass()
cls.log.info("Loaded: PilotLoggingPlugin class", configValue)

cls.meta = {}
logPath = os.path.join(os.getcwd(), "pilotlogs")
cls.meta["LogPath"] = logPath
if not os.path.exists(logPath):
os.makedirs(logPath)
cls.log.info("Pilot logging directory:", logPath)

def initializeRequest(self):
"""
Called for each request.

:return: None
"""

self.log.info("Request initialised.. ")
Comment on lines +53 to +60

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed.

@martynia martynia Mar 25, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it will go if I don't need it in the near future.


auth_sendMessage = ["Operator", "Pilot", "GenericPilot"]

def export_sendMessage(self, message):
# def export_sendMessage(self, message, pilotUUID):
"""
The method logs messages to Tornado and writes pilot log files, one per pilot.

:param message: message sent by a client, in JSON format
:param pilotUUID: pilot UUID - used to create a log file
:return: S_OK or S_ERROR if a file cannot be created or written to.
"""
## Insert your method here, don't forget the return should be serializable
## Returned value may be an S_OK/S_ERROR
## You don't need to serialize in JSON, Tornado will do it
self.log.info("Message: ", message)
# the plugin returns S_OK or S_ERROR
result = self.loggingPlugin.sendMessage(message)
return result

auth_getMetadata = ["Operator", "TrustedHost"]

def export_getMetadata(self):
"""
Get PilotLoggingHandler metadata. Intended to be used by a client or an agent.

:return: S_OK containing a metadata dictionary
"""
return self.loggingPlugin.getMeta()

auth_finaliseLogs = ["Operator", "Pilot", "GenericPilot"]

def export_finaliseLogs(self, payload):
"""
Finalise a log file. Finalised logfile can be copied to a secure location. if a file cache is used.

:param payload: data passed to the plugin finaliser, a string in the file cache plugin.
:type payload: str or dict
:return: S_OK or S_ERROR (via the plugin involved)
:rtype: dict
"""

# The plugin returns the Dirac S_OK or S_ERROR object

return self.loggingPlugin.finaliseLogs(payload)
Loading