diff --git a/Pilot/MessageSender.py b/Pilot/MessageSender.py deleted file mode 100644 index 02e91eae..00000000 --- a/Pilot/MessageSender.py +++ /dev/null @@ -1,381 +0,0 @@ -""" Message sender module for the remote logging system. - It provides the general interface of the message sender and - several implementations that e.g. allows to send the message - to a REST interface or to a MQ server using the Stomp protocol. - The instances of message senders should be created by using - messageSenderFactory() that takes one string argument corresponding - to message sender type we want to create (see messageSenderFactory docstring for details) - and the optional parameters params that will be passed to sender message instance. - For parameter details see given sender message implementation. - e.g to create a message sender that will send messages to a local file 'myFile': - myLocalSender = messageSenderFactory('LOCAL_FILE', params ={'LocalOutputFile': 'myFile'}) - myLocalSender.sendMessage("blabla", "myFlag") -""" - -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import - -import logging - -try: - import requests -except ImportError: - requests = None -try: - import stomp -except ImportError: - stomp = None - -############################ -# python 2 -> 3 "hacks" -try: - import Queue as queue -except ImportError: - import queue -############################ - - -def loadAndCreateObject(moduleName, className, params): - """ - Function loads the class from the module and creates - the instance of this class by using params arguments - example usage: myObj = loadAndCreateObject('Pilot.MessageSender', 'StompSender',params) - Args: - moduleName(str): e.g. 'Pilot.MessageSender' or 'MessageSender' - className(str): e.g. 'StompSender' - params: arguments passed to __init__ of the class. - Return: - obj: Created instance of the class or None in case of errors. - """ - myObj = None - # In case of moduleName in the format of X.Y.Z, we have - # mods =['X','Y','Z']. We are really interested in loading - # 'Z' submodule. - mods = moduleName.split(".") - # The __import__ call with - # fromlist option set to mods[-1] will load Z submodule as expected. - # Simpler X format will be also covered. - module = __import__(moduleName, globals(), locals(), mods[-1]) - try: - myClass = getattr(module, className) - if params: - myObj = myClass(params) - else: - myObj = myClass() - - except AttributeError: - logging.error("Class %s not found", className) - return myObj - - -class MessageSender(object): - """General interface of message sender.""" - - def sendMessage(self, msg, flag): - """Must be implemented by children classes.""" - raise NotImplementedError - - -def messageSenderFactory(senderType, params): - """ - Function creates MessageSender according to sender type. - Args: - senderType(str): sender type to be created. - The allowed types are 'LOCAL_FILE', 'MQ', 'REST_API', - params(dict): additional parameters passed to init - Return: - MessageSender or None in case of errors - - """ - typeToModuleAndClassName = { - "LOCAL_FILE": {"module": "MessageSender", "class": "LocalFileSender"}, - "MQ": {"module": "MessageSender", "class": "StompSender"}, - "REST_API": {"module": "MessageSender", "class": "RESTSender"}, - } - try: - moduleName = typeToModuleAndClassName[senderType]["module"] - className = typeToModuleAndClassName[senderType]["class"] - - logging.debug( - "Trying to load and create object of module: %s, class: %s, params: %s", - str(moduleName), - str(className), - str(params), - ) - try: - return loadAndCreateObject(moduleName, className, params) - except ImportError: - logging.warning("Module %s not found", moduleName) - return loadAndCreateObject("Pilot." + moduleName, className, params) - - except ValueError: - logging.error("Error initializing the message sender of type %s", senderType) - return None - - -def createParamChecker(requiredKeys): - """Function returns a function that can be used to check - if the parameters in form of a dictionary contain - the required set of keys. Also it checks if the parameters - are not empty. - Args: - requiredKeys(list) - Return: - function: or None if requiredKeys is None - """ - if not requiredKeys: - return None - - def areParamsCorrect(params): - """ - Args: - params(dict): - Return: - bool: - """ - if not params: - return False - if not all(k in params for k in requiredKeys): - return False - return True - - return areParamsCorrect - - -class RESTSender(MessageSender): - """Message sender to a REST interface. - It depends on requests module. - """ - - REQUIRED_KEYS = [ - "HostKey", - "HostCertificate", - "CACertificate", - "Url", - "LocalOutputFile", - ] - - def __init__(self, params): - """ - Raises: - ValueError: If params are not correct - """ - logging.debug("in init of RESTSender") - self._areParamsCorrect = createParamChecker(self.REQUIRED_KEYS) - self.params = params - if not self._areParamsCorrect(self.params): - logging.error( - "Parameters missing needed to send messages! Parameters:%s", - str(self.params), - ) - raise ValueError("Parameters missing needed to send messages") - - def sendMessage(self, msg, flag): - url = self.params.get("Url") - hostKey = self.params.get("HostKey") - hostCertificate = self.params.get("HostCertificate") - CACertificate = self.params.get("CACertificate") - - logging.debug("sending message from the REST Sender") - try: - requests.post( - url, # pylint: disable=undefined-variable - json=msg, - cert=(hostCertificate, hostKey), - verify=CACertificate, - ) - except ( - requests.exceptions.RequestException, - IOError, - ) as e: # pylint: disable=undefined-variable - logging.error(e) - return False - return True - - -def eraseFileContent(filename): - """Erases the content of a given file.""" - with open(filename, "w+") as myFile: - myFile.truncate() - - -def saveMessageToFile(msg, filename="myLocalQueueOfMessages"): - """Adds the message to a file appended as a next line.""" - with open(filename, "a+") as myFile: - myFile.write(msg + "\n") - - -def readMessagesFromFileAndEraseFileContent(filename="myLocalQueueOfMessages"): - """Generates the queue FIFO and fills it - with values from the file, assuming that one line - corresponds to one message. - Finally the file content is erased. - Returns: - Queue: - """ - rqueue = queue.Queue() - with open(filename, "r") as myFile: - for line in myFile: - rqueue.put(line) - eraseFileContent(filename) - return rqueue - - -class LocalFileSender(MessageSender): - """Message sender to a local file.""" - - REQUIRED_KEYS = ["LocalOutputFile"] - - def __init__(self, params): - """ - Raises: - ValueError: If params are not correct. - """ - logging.debug("in init of LocalFileSender") - self._areParamsCorrect = createParamChecker(self.REQUIRED_KEYS) - self.params = params - if not self._areParamsCorrect(self.params): - logging.error( - "Parameters missing needed to send messages! Parameters:%s", - str(self.params), - ) - raise ValueError("Parameters missing needed to send messages") - - def sendMessage(self, msg, flag): - logging.debug("in sendMessage of LocalFileSender") - filename = self.params.get("LocalOutputFile") - saveMessageToFile(msg, filename=filename) - return True - - -class StompSender(MessageSender): - """Stomp message sender. - It depends on stomp module. - """ - - REQUIRED_KEYS = [ - "HostKey", - "HostCertificate", - "CACertificate", - "QueuePath", - "LocalOutputFile", - ] - - def __init__(self, params): - """ - Raises: - ValueError: If params are not correct. - """ - - self._areParamsCorrect = createParamChecker(self.REQUIRED_KEYS) - self.params = params - if not self._areParamsCorrect(self.params): - logging.error( - "Parameters missing needed to send messages! Parameters:%s", - str(self.params), - ) - raise ValueError("Parameters missing needed to send messages") - - def sendMessage(self, msg, flag): - """Method first copies the message content to the - local storage, then it checks if the connection - to the MQ server is up, - If it is the case it sends all messages stored - locally. The string flag can be used as routing_key, - it can contain: 'info', 'warning', 'error', - 'debug'. If the connection is down, the method - does nothing and returns False - Returns: - bool: False in case of any errors, True otherwise - """ - - wqueue = self.params.get("QueuePath") - host = self.params.get("Host") - port = int(self.params.get("Port")) - hostKey = self.params.get("HostKey") - hostCertificate = self.params.get("HostCertificate") - CACertificate = self.params.get("CACertificate") - filename = self.params.get("LocalOutputFile") - - saveMessageToFile(msg, filename) - connection = self._connect( - (host, port), - { - "key_file": hostKey, - "cert_file": hostCertificate, - "ca_certs": CACertificate, - }, - ) - if not connection: - return False - self._sendAllLocalMessages(connection, wqueue, filename) - self._disconnect(connection) - return True - - def _connect(self, hostAndPort, sslCfg): - """Connects to MQ server and returns connection - handler or None in case of connection down. - Stomp-depended function. - Args: - hostAndPort(list): of tuples, containing ip address and the port - where the message broker is listening for stomp - connections. e.g. [(127.0.0.1,6555)] - sslCfg(dict): with three keys 'key_file', 'cert_file', and 'ca_certs'. - Return: - stomp.Connection: or None in case of errors. - """ - if not sslCfg: - logging.error("sslCfg argument is None") - return None - if not hostAndPort: - logging.error("hostAndPort argument is None") - return None - if not all(key in sslCfg for key in ["key_file", "cert_file", "ca_certs"]): - logging.error("Missing sslCfg keys") - return None - - try: - connection = stomp.Connection( - host_and_ports=hostAndPort, use_ssl=True - ) # pylint: disable=undefined-variable - connection.set_ssl( - for_hosts=hostAndPort, - key_file=sslCfg["key_file"], - cert_file=sslCfg["cert_file"], - ca_certs=sslCfg["ca_certs"], - ) - connection.start() # pylint: disable=no-member - connection.connect() # pylint: disable=no-member - return connection - except stomp.exception.ConnectFailedException: # pylint: disable=undefined-variable - logging.error("Connection error") - return None - except IOError: - logging.error("Could not find files with ssl certificates") - return None - - def _send(self, msg, destination, connectHandler): - """Sends a message and logs info. - Stomp-depended function. - """ - if not connectHandler: - return False - connectHandler.send(destination=destination, body=msg) - logging.info(" [x] Sent %r ", msg) - return True - - def _disconnect(self, connectHandler): - """Disconnects. - Stomp-depended function. - """ - connectHandler.disconnect() - - def _sendAllLocalMessages(self, connectHandler, destination, filename): - """Retrieves all messages from the local storage - and sends it. - """ - rqueue = readMessagesFromFileAndEraseFileContent(filename) - while not rqueue.empty(): - msg = rqueue.get() - self._send(msg, destination, connectHandler) diff --git a/Pilot/PilotLogger.py b/Pilot/PilotLogger.py deleted file mode 100644 index ebb02db3..00000000 --- a/Pilot/PilotLogger.py +++ /dev/null @@ -1,224 +0,0 @@ -""" Pilot logger module for the remote logging system. -""" - -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import - -import os -import logging - -############################ -# python 2 -> 3 "hacks" -try: - from argparse import ArgumentTypeError - from argparse import ArgumentParser - from argparse import RawTextHelpFormatter -except ImportError: # argparse is from python 2.7+ - from optparse import OptParseError as ArgumentTypeError - from optparse import OptionParser as ArgumentParser - -try: - from Pilot.PilotLoggerTools import generateDict, encodeMessage - from Pilot.PilotLoggerTools import generateTimeStamp - from Pilot.PilotLoggerTools import isMessageFormatCorrect - from Pilot.PilotLoggerTools import readPilotJSONConfigFile - from Pilot.PilotLoggerTools import getUniqueIDAndSaveToFile - from Pilot.MessageSender import messageSenderFactory -except ImportError: - from PilotLoggerTools import generateDict, encodeMessage - from PilotLoggerTools import generateTimeStamp - from PilotLoggerTools import isMessageFormatCorrect - from PilotLoggerTools import readPilotJSONConfigFile - from PilotLoggerTools import getUniqueIDAndSaveToFile - from MessageSender import messageSenderFactory -############################ - - -def getPilotUUIDFromFile(filename="PilotUUID"): - """Retrieves Pilot UUID from the file of given name. - Returns: - str: empty string in case of errors. - """ - - try: - with open(filename, "r") as myFile: - uniqueId = myFile.read() - return uniqueId - except IOError: - logging.error("Could not open the file with UUID:" + filename) - return "" - - -def addMissingConfiguration(config, defaultConfig=None): - """Creates new dict which contains content of config with added missing keys - and values defined in defaultConfig. - If a key from defaultConfig is absent in config set, the value,key pair is added. - If a key is present but the value is None, then the value from defaultConfig is assigned. - The default config contains the following structure: - {'LoggingType':'LOCAL_FILE','LocalOutputFile': 'myLocalQueueOfMessages', 'FileWithID': 'PilotUUID'} - Args: - config(dict): - defaultConfig(dict): - Returns: - dict: - """ - if defaultConfig is None: - defaultConfig = { - "LoggingType": "LOCAL_FILE", - "LocalOutputFile": "myLocalQueueOfMessages", - "FileWithID": "PilotUUID", - } - if not config or not isinstance(config, dict): - return defaultConfig - - currConfig = config.copy() - for k, v in defaultConfig.items(): - if k not in currConfig: - currConfig[k] = v - else: - if currConfig[k] is None: - currConfig[k] = v - return currConfig - - -class PilotLogger(object): - """Base pilot logger class.""" - - STATUSES = ["info", "warning", "error", "debug"] - - def __init__( - self, - configFile="pilot.json", - messageSenderType="LOCAL_FILE", - localOutputFile="myLocalQueueOfMessages", - fileWithUUID="PilotUUID", - setup="DIRAC-Certification", - ): - """ctr loads the configuration parameters from the json file - or if the file does not exists, loads the default set - of values. Next, if self.fileWithUUID is not set (this - variable corresponds to the name of the file with Pilot - Agent ID) the default value is used, and if the file does - not exist, the Pilot ID is created and saved in this file. - Args: - configFile(str): Name of the file with the configuration parameters. - messageSenderType(str): Type of the message sender to use e.g. to a REST interface, - to a message queue or to a local file. - localOutputFile(str): Name of the file that can be used to store the log messages locally. - fileWithUUID(str): Name of the file used to store the Pilot identifier. - """ - logging.debug("In init of PilotLogger") - self.STATUSES = PilotLogger.STATUSES - - self.params = addMissingConfiguration( - config=readPilotJSONConfigFile(configFile, setup), - defaultConfig={ - "LoggingType": messageSenderType, - "LocalOutputFile": localOutputFile, - "FileWithID": fileWithUUID, - }, - ) - - fileWithID = self.params["FileWithID"] - if os.path.isfile(fileWithID): - logging.warning("The file: " + fileWithID + " already exists. The content will be used to get UUID.") - else: - result = getUniqueIDAndSaveToFile(filename=fileWithID) - if not result: - logging.error("Error while generating pilot logger id.") - self.messageSender = messageSenderFactory(senderType=self.params["LoggingType"], params=self.params) - if not self.messageSender: - logging.error("Something went wrong - no messageSender created.") - - def _isCorrectStatus(self, status): - """Checks if the flag corresponds to one of the predefined - STATUSES, check constructor for current set. - """ - return status in self.STATUSES - - def sendMessage(self, messageContent, source="unspecified", phase="unspecified", status="info"): - """Sends the message after - creating the correct format: - including content, timestamp, status, source, phase and the uuid - of the pilot. - Returns: - bool: False in case of any errors, True otherwise - """ - logging.debug("In sendMessage of PilotLogger") - if not self._isCorrectStatus(status): - logging.error("status: " + str(status) + " is not correct") - return False - myUUID = getPilotUUIDFromFile(self.params["FileWithID"]) - message = generateDict(myUUID, generateTimeStamp(), source, phase, status, messageContent) - if not isMessageFormatCorrect(message): - logging.warning("Message format is not correct.") - return False - encodedMsg = encodeMessage(message) - return self.messageSender.sendMessage(encodedMsg, flag=status) - - -def main(): - """main() function is used to send a message - before any DIRAC related part is installed. - Remember that it is assumed that the PilotUUID was - already generated and stored into some file. - """ - - def singleWord(arg): - if len(arg.split()) != 1: - msg = "argument must be single word" - raise ArgumentTypeError(msg) - return arg - - parser = ArgumentParser( - description="command line interface to send logs to MQ system.", - formatter_class=RawTextHelpFormatter, - epilog="examples:\n" - + " python PilotLogger.py InstallDIRAC installing info My message\n" - + " python PilotLogger.py InstallDIRAC installing debug Debug message\n" - + ' python PilotLogger.py "My message"\n' - + ' python PilotLogger.py "My message" --output myFileName\n', - ) - - parser.add_argument( - "source", - type=singleWord, - nargs="?", - default="unspecified", - help='Source of the message e.g. "InstallDIRAC". It must be one word. ' - + 'If not specified it is set to "unspecified".', - ) - parser.add_argument( - "phase", - type=singleWord, - nargs="?", - default="unspecified", - help='Phase of the process e.g. "fetching". It must be one word. ' - + 'If not specified it is set to "unspecified".', - ) - parser.add_argument( - "status", - nargs="?", - choices=PilotLogger.STATUSES, - default="info", - help="Allowed values are: " + ", ".join(PilotLogger.STATUSES) + '. If not specified it is set to "info".', - metavar="status ", - ) - parser.add_argument("message", nargs="+", help="Human readable content of the message. ") - parser.add_argument( - "--output", help="Log the content to the specified file" + " instead of sending it to the Message Queue server." - ) - args = parser.parse_args() - - if len(" ".join(args.message)) >= 200: - raise ArgumentTypeError("message must be less than 200 characters") - if args.output: - logger = PilotLogger(localOutputFile=args.output) - else: - logger = PilotLogger() - logger.sendMessage(messageContent=" ".join(args.message), source=args.source, phase=args.phase, status=args.status) - - -if __name__ == "__main__": - main() diff --git a/Pilot/PilotLoggerTools.py b/Pilot/PilotLoggerTools.py deleted file mode 100644 index b34f7483..00000000 --- a/Pilot/PilotLoggerTools.py +++ /dev/null @@ -1,280 +0,0 @@ -"""A set of tools for the remote pilot agent logging system -""" - -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import - -import sys -import os -import logging -import time -import json -from uuid import uuid1 - -############################ -# python 2 -> 3 "hacks" -try: - basestring -except NameError: - basestring = str -############################ - - -__RCSID__ = "$Id$" - - -def createPilotLoggerConfigFile( - filename="PilotLogger.json", - loggingType="", - localOutputFile="", - host="", - port="", - url="", - key_file="", - cert_file="", - ca_certs="", - fileWithID="", - queue=None, - setup="DIRAC-Certification", -): - """Helper function that creates a test configuration file. - The format is json encoded file. - The created file can be mainy used for testing of PilotLogger setups, since - the included parameters are related only to communication settings, and many - other parameters are not present. - Arguments: - queue(dict): e.g. {"lhcb.test.*":{"Persitent":"False", "Ackonwledgement":"False"}} - """ - if queue is None: - queue = {} - keys = [ - "LoggingType", - "LocalOutputFile", - "Host", - "Port", - "Url", - "HostKey", - "HostCertificate", - "CACertificate", - "FileWithID", - "Queue", - ] - values = [loggingType, localOutputFile, host, port, url, key_file, cert_file, ca_certs, fileWithID, queue] - config = dict(zip(keys, values)) - content = dict() - content["Setups"] = {} - content["Setups"][setup] = {} - content["Setups"][setup]["Logging"] = config - config = json.dumps(content) - with open(filename, "w") as myFile: - myFile.write(config) - - -def readPilotJSONConfigFile(filename, setup="DIRAC-Certification"): - """Helper function that loads configuration settings from a pilot json file. - It is assumed that the json file contains the section "Logging" embedded in - the following form: - { - "Setups": { - "DIRAC-Certification": { - "Logging": { - } - } - } - } - Only information from Logging section are considered. If any of the - corresponding key is missing, then the None is a assigned - Args: - str(filename): - Returns: - dict: with the following keys (not all are obligatory): - 'LoggingType', 'LocalOutputFile', 'Host','Port','QueuePath','HostKey','HostCertificate','Url', - 'CACertificate','FileWithID' or None in case of errors. - """ - pilotJSON = None - try: - with open(filename, "r") as myFile: - pilotJSON = json.load(myFile) - except (IOError, ValueError): - logging.warning("Could not open or load the configuration file:" + filename) - return None - try: - partial = pilotJSON["Setups"][setup]["Logging"] - except KeyError: - logging.error("Loaded data does not have the correct section format") - return None - keys = [ - "LoggingType", - "LocalOutputFile", - "Host", - "Port", - "Url", - "HostKey", - "HostCertificate", - "CACertificate", - "FileWithID", - ] - config = dict((k, partial.get(k)) for k in keys) - # two special cases: - try: - config["QueuePath"] = "/queue/" + next(iter(partial.get("Queue"))) - except TypeError: - config["QueuePath"] = None - - if config["FileWithID"] is None: - config["FileWithID"] = "PilotUUID" - - return config - - -def generateDict(pilotUUID, timestamp, source, phase, status, messageContent): - """Helper function that returs a dictionnary based on the - set of input values. - Returns - dict: - """ - - keys = ["pilotUUID", "timestamp", "source", "phase", "status", "messageContent"] - values = [pilotUUID, timestamp, source, phase, status, messageContent] - return dict(zip(keys, values)) - - -def encodeMessage(content): - """Method encodes the message in form of the serialized JSON string - see https://docs.python.org/2/library/json.html#py-to-json-table - Args: - content(dict): - Returns: - str: in the JSON format. - Raises: - TypeError:if cannont encode json properly - """ - return json.dumps(content) - - -def decodeMessage(msgJSON): - """Decodes the message from the serialized JSON string - See https://docs.python.org/2/library/json.html#py-to-json-table. - Args: - msgJSON(str):in the JSON format. - Returns: - str: decoded objecst. - Raises: - TypeError: if cannot decode JSON properly. - """ - return json.loads(msgJSON) - - -def isMessageFormatCorrect(content): - """Checks if input format is correct. - Function checks if the input format is a dictionnary - in the following format: - 0) content is a dictionary, - 1) it contains only those keys of basestring types: - 'pilotUUID', 'status', 'messageContent', 'timestamp', 'source','phase' - 2) it contains only values of basestring types. - Args: - content(dict): all values must be non-empty - Returns: - bool: True if message format is correct, False otherwise - Example: - {"status": "info", - "timestamp": "1427121370.7", - "messageContent": "Uname = Linux localhost 3.10.64-85.cernvm.x86_64", - "pilotUUID": "eda78924-d169-11e4-bfd2-0800275d1a0a", - "phase": "Installing", - "source": "InstallDIRAC" - } - """ - if not isinstance(content, dict): - return False - refKeys = sorted(["pilotUUID", "status", "messageContent", "timestamp", "phase", "source"]) - keys = sorted(content.keys()) - if not keys == refKeys: - return False - values = content.values() - # if any value is not of basestring type - if any(not isinstance(val, basestring) for val in values): - return False - # checking if all elements are not empty - if any(not val for val in values): - return False - return True - - -def generateTimeStamp(): - """Generates the current timestamp in Epoch format. - Returns: - str: with number of seconds since the Epoch. - """ - return str(time.time()) - - -def generateUniqueID(): - """Generates a unique identifier based on uuid1 function - Returns: - str: containing uuid - """ - return str(uuid1()) - - -def getUniqueIDAndSaveToFile(filename="PilotUUID"): - """Generates the unique id and writes it to a file - of given name. - First, we try to receive the UUID from the OS, if that fails - the local uuid is generated. - Args: - filename(str): file to which the UUID will be saved - Returns: - bool: True if everything went ok False if there was an error with the file - """ - myId = getUniqueIDFromOS() - if not myId: - myId = generateUniqueID() - try: - with open(filename, "w") as myFile: - myFile.write(myId) - return True - except IOError: - logging.error("could not open file") - return False - - -def getUniqueIDFromOS(): - """Retrieves unique identifier based on specific OS. - The OS type is identified based on some predefined - environmental variables that should contain identifiers - for given node. For VM the combination of 3 variables is used to - create the identifier. Only the first found identifier is returned - Returns: - str: If variable(s) found the generated identifier is returned. Empty - string is returned if all checks fails. If there are more than one - valid identifier, only the first one is returned. - """ - # VM case: vm://$CE_NAME/$CE_NAME:$VMTYPE:$VM_UUID - vmEnvVars = ["CE_NAME", "VMTYPE", "VM_UUID"] - if all(var in os.environ for var in vmEnvVars): - ce_name = os.environ.get("CE_NAME") - partial_id = ":".join((os.environ.get(var) for var in vmEnvVars)) - return "vm://" + ce_name + "/" + partial_id - # Other cases: $envVar - envVars = ["CREAM_JOBID", "GRID_GLOBAL_JOBID"] - ids = (os.environ.get(var) for var in envVars if var in os.environ) - return next(ids, "") - - -def main(): - """Is used to generate the pilot uuid - and save it to a file even - before any DIRAC related part is installed. - """ - filename = " ".join(sys.argv[1:]) - if not filename: - getUniqueIDAndSaveToFile() - else: - getUniqueIDAndSaveToFile(filename) - - -if __name__ == "__main__": - main() diff --git a/Pilot/dirac-pilot.py b/Pilot/dirac-pilot.py index ac095727..c9e03ba6 100644 --- a/Pilot/dirac-pilot.py +++ b/Pilot/dirac-pilot.py @@ -30,21 +30,37 @@ ############################ # python 2 -> 3 "hacks" + +try: + from cStringIO import StringIO +except ImportError: + from io import StringIO + try: - from Pilot.pilotTools import Logger, pythonPathCheck, PilotParams, getCommand + from Pilot.pilotTools import Logger, RemoteLogger, pythonPathCheck, PilotParams, getCommand except ImportError: - from pilotTools import Logger, pythonPathCheck, PilotParams, getCommand + from pilotTools import Logger, RemoteLogger, pythonPathCheck, PilotParams, getCommand ############################ if __name__ == "__main__": pilotStartTime = int(time.time()) - log = Logger("Pilot", debugFlag=True) - + sys.stdout, oldstdout = StringIO(), sys.stdout pilotParams = PilotParams() - if pilotParams.debugFlag: - log.setDebug() + sys.stdout, buffer = oldstdout, sys.stdout + bufContent = buffer.getvalue() + buffer.close() + sys.stdout.write(bufContent) + + if pilotParams.pilotLogging: + log = RemoteLogger( + pilotParams.loggerURL, "Pilot", pilotUUID=pilotParams.pilotUUID, debugFlag=pilotParams.debugFlag + ) + log.buffer.write(bufContent) + else: + log = Logger("Pilot", debugFlag=pilotParams.debugFlag) + if pilotParams.keepPythonPath: pythonPathCheck() else: @@ -63,12 +79,17 @@ log.info("Requested command extensions: %s" % str(pilotParams.commandExtensions)) log.info("Executing commands: %s" % str(pilotParams.commands)) + if pilotParams.pilotLogging: + log.buffer.flush() for commandName in pilotParams.commands: command, module = getCommand(pilotParams, commandName, log) if command is not None: - log.info("Command %s instantiated from %s" % (commandName, module)) + command.log.info("Command %s instantiated from %s" % (commandName, module)) command.execute() else: log.error("Command %s could not be instantiated" % commandName) + # send the last message and abandon ship. + if pilotParams.pilotLogging: + log.buffer.flush() sys.exit(-1) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index c79b7b5b..05343f30 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -54,6 +54,41 @@ def __init__(self, pilotParams): ############################ +def logFinalizer(func): + """ + PilotCammand decorator. It marks a log file as final so no more messages should be written to it . + Finalising is triggered by a return statement or any sys.exit() call, so a file might be incomplete + if a command throws SystemExit exception with a code =! 0. + + :param func: method to be decorated + :type func: method object + :return: None + :rtype: None + """ + + def wrapper(self): + + if not self.log.isPilotLoggerOn: + self.log.debug("Remote logger is not active, not log flushing performed") + return func(self) + + try: + return func(self) + except SystemExit as exCode: # or Exception ? + pRef = self.pp.pilotReference + self.log.info( + "Flushing the remote logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode)) + ) + raise + finally: + try: + self.log.buffer.flush() # flush the buffer unconditionally (on sys.exit() and return. + except Exception as exc: + self.log.error("Remote logger couldn't be finalised %s " % str(exc)) + + return wrapper + + class GetPilotVersion(CommandBase): """Now just returns what was obtained by pilotTools.py""" @@ -61,6 +96,7 @@ def __init__(self, pilotParams): """c'tor""" super(GetPilotVersion, self).__init__(pilotParams) + @logFinalizer def execute(self): """Just returns what was obtained by pilotTools.py""" return self.releaseVersion @@ -73,6 +109,7 @@ def __init__(self, pilotParams): """c'tor""" super(CheckWorkerNode, self).__init__(pilotParams) + @logFinalizer def execute(self): """Get host and local user info, and other basic checks, e.g. space available""" @@ -361,6 +398,7 @@ def _installDIRACpy3(self): self.log.error("Could not pip install %s [ERROR %d]" % (self.releaseVersion, retCode)) self.exitWithError(retCode) + @logFinalizer def execute(self): """What is called all the time""" if self.pp.pythonVersion == "27": @@ -409,6 +447,7 @@ def __init__(self, pilotParams): super(ConfigureBasics, self).__init__(pilotParams) self.cfg = [] + @logFinalizer def execute(self): """What is called all the times. @@ -484,6 +523,7 @@ def __init__(self, pilotParams): # and that will fill the local dirac.cfg file self.cfg = [] + @logFinalizer def execute(self): """Setup CE/Queue Tags and other relevant parameters.""" @@ -557,6 +597,7 @@ def __init__(self, pilotParams): super(CheckWNCapabilities, self).__init__(pilotParams) self.cfg = [] + @logFinalizer def execute(self): """Discover NumberOfProcessors and RAM""" @@ -667,6 +708,7 @@ def __init__(self, pilotParams): # and that will fill the local dirac.cfg file self.cfg = [] + @logFinalizer def execute(self): """Setup configuration parameters""" self.__setFlavour() @@ -832,6 +874,7 @@ class ConfigureArchitecture(CommandBase): Separated from the ConfigureDIRAC command for easier extensibility. """ + @logFinalizer def execute(self): """This is a simple command to call the dirac-platform utility to get the platform, and add it to the configuration @@ -888,6 +931,7 @@ def __init__(self, pilotParams): """c'tor""" super(ConfigureCPURequirements, self).__init__(pilotParams) + @logFinalizer def execute(self): """Get job CPU requirement and queue normalization""" # Determining the CPU normalization factor and updating pilot.cfg with it @@ -1074,6 +1118,7 @@ def __startJobAgent(self): diskSpace = int(fs[4] * fs[0] / 1024 / 1024) self.log.info("DiskSpace (MB) = %s" % diskSpace) + @logFinalizer def execute(self): """What is called all the time""" self.__setInnerCEOpts() @@ -1279,6 +1324,7 @@ def __parseJobAgentLog(self, logFile): return shutdownMessage + @logFinalizer def execute(self): """What is called all the time""" self.__setInProcessOpts() @@ -1396,6 +1442,7 @@ def _runNagiosProbes(self): "PUT of %s Nagios output fails with %d %s" % (probeCmd, result.status, result.reason) ) + @logFinalizer def execute(self): """Standard entry point to a pilot command""" self._setNagiosOptions() diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index ca81c850..4140659a 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -8,16 +8,18 @@ __RCSID__ = "$Id$" import sys -import time import os import pickle import getopt import imp import json import re +import select import signal import subprocess -import select +import ssl +from datetime import datetime +from functools import partial from distutils.version import LooseVersion ############################ @@ -25,19 +27,21 @@ try: from urllib.request import urlopen from urllib.error import HTTPError, URLError + from urllib.parse import urlencode except ImportError: from urllib2 import urlopen, HTTPError, URLError + from urllib import urlencode + +try: + from cStringIO import StringIO +except ImportError: + from io import StringIO try: basestring except NameError: basestring = str -try: - from Pilot.PilotLogger import PilotLogger -except ImportError: - from PilotLogger import PilotLogger -############################ # Utilities functions @@ -262,18 +266,27 @@ def __init__(self, name="Pilot", debugFlag=False, pilotOutput="pilot.out"): self.debugFlag = debugFlag self.name = name self.out = pilotOutput + self._headerTemplate = "{datestamp} {{level}} [{name}] {{message}}" + + @property + def messageTemplate(self): + """ + Message template in ISO-8601 format. + + :return: template string + :rtype: str + """ + return self._headerTemplate.format( + datestamp=datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + name=self.name, + ) def __outputMessage(self, msg, level, header): if self.out: with open(self.out, "a") as outputFile: for _line in str(msg).split("\n"): if header: - outLine = "%s UTC %s [%s] %s" % ( - time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), - level, - self.name, - _line, - ) + outLine = self.messageTemplate.format(level=level, message=_line) print(outLine) if self.out: outputFile.write(outLine + "\n") @@ -300,74 +313,199 @@ def info(self, msg, header=True): self.__outputMessage(msg, "INFO", header) -class ExtendedLogger(Logger): - """The logger object, for use inside the pilot. It prints messages. - But can be also used to send messages to the queue +class RemoteLogger(Logger): + """ + The remote logger object, for use inside the pilot. It prints messages, + but can be also used to send messages to an external service. """ def __init__( - self, name="Pilot", debugFlag=False, pilotOutput="pilot.out", isPilotLoggerOn=True, setup="DIRAC-Certification" + self, + url, + name="Pilot", + debugFlag=False, + pilotOutput="pilot.out", + isPilotLoggerOn=True, + pilotUUID="unknown", + setup="DIRAC-Certification", ): - """c'tor + """ + c'tor If flag PilotLoggerOn is not set, the logger will behave just like the original Logger object, that means it will just print logs locally on the screen """ - super(ExtendedLogger, self).__init__(name, debugFlag, pilotOutput) - if isPilotLoggerOn: - self.pilotLogger = PilotLogger(setup=setup) - else: - self.pilotLogger = None + super(RemoteLogger, self).__init__(name, debugFlag, pilotOutput) + self.url = url self.isPilotLoggerOn = isPilotLoggerOn + sendToURL = partial(sendMessage, url, pilotUUID) + self.buffer = FixedSizeBuffer(sendToURL) def debug(self, msg, header=True, sendPilotLog=False): - super(ExtendedLogger, self).debug(msg, header) - if self.isPilotLoggerOn and sendPilotLog: - self.pilotLogger.sendMessage(msg, status="debug") + super(RemoteLogger, self).debug(msg, header) + if ( + self.isPilotLoggerOn and self.debugFlag + ): # the -d flag activates this debug flag in CommandBase via PilotParams + self.sendMessage(self.messageTemplate.format(level="DEBUG", message=msg)) def error(self, msg, header=True, sendPilotLog=False): - super(ExtendedLogger, self).error(msg, header) - if self.isPilotLoggerOn and sendPilotLog: - self.pilotLogger.sendMessage(msg, status="error") + super(RemoteLogger, self).error(msg, header) + if self.isPilotLoggerOn: + self.sendMessage(self.messageTemplate.format(level="ERROR", message=msg)) def warn(self, msg, header=True, sendPilotLog=False): - super(ExtendedLogger, self).warn(msg, header) - if self.isPilotLoggerOn and sendPilotLog: - self.pilotLogger.sendMessage(msg, status="warning") + super(RemoteLogger, self).warn(msg, header) + if self.isPilotLoggerOn: + self.sendMessage(self.messageTemplate.format(level="WARNING", message=msg)) def info(self, msg, header=True, sendPilotLog=False): - super(ExtendedLogger, self).info(msg, header) - if self.isPilotLoggerOn and sendPilotLog: - self.pilotLogger.sendMessage(msg, status="info") + super(RemoteLogger, self).info(msg, header) + if self.isPilotLoggerOn: + self.sendMessage(self.messageTemplate.format(level="INFO", message=msg)) + + def sendMessage(self, msg): + """ + Buffered message sender. - def sendMessage(self, msg, source, phase, status="info", sendPilotLog=True): - if self.isPilotLoggerOn and sendPilotLog: - self.pilotLogger.sendMessage(messageContent=msg, source=source, phase=phase, status=status) + :param msg: message to send + :type msg: str + :return: None + :rtype: None + """ + try: + self.buffer.write(msg + "\n") + except Exception as err: + super(RemoteLogger, self).error("Message not sent") + super(RemoteLogger, self).error(str(err)) + + +class FixedSizeBuffer(object): + """ + A buffer with a (preferred) fixed number of lines. + Once it's full, a message is sent to a remote server and the buffer is renewed. + """ + + def __init__(self, senderFunc, bufsize=10): + """ + Constructor. + + :param senderFunc: a function used to send a message + :type senderFunc: func + :param bufsize: size of the buffer (in lines) + :type bufsize: int + """ + self.output = StringIO() + self.bufsize = bufsize + self.__nlines = 0 + self.senderFunc = senderFunc + + def write(self, text): + """ + Write text to a string buffer. Newline characters are counted and number of lines in the buffer + is increased accordingly. + + :param text: text string to write + :type text: str + :return: None + :rtype: None + """ + # reopen the buffer in a case we had to flush a partially filled buffer + if self.output.closed: + self.output = StringIO() + self.output.write(text) + self.__nlines += max(1, text.count("\n")) + self.sendFullBuffer() + + def getValue(self): + content = self.output.getvalue() + return content + + def sendFullBuffer(self): + """ + Get the buffer content, send a message, close the current buffer and re-create a new one for subsequent writes. + + """ + + if self.__nlines >= self.bufsize: + self.flush() + self.output = StringIO() + + def flush(self): + """ + Flush the buffer and send log records to a remote server. The buffer is closed as well. + + :return: None + :rtype: None + """ + + self.output.flush() + buf = self.getValue() + self.senderFunc(buf) + self.__nlines = 0 + self.output.close() + + +def sendMessage(url, pilotUUID, rawMessage): + """ + Send a message to Tornado. + + :param url: + :type url: + :param pilotUUID: + :type pilotUUID: + :param rawMessage: + :type rawMessage: + :return: + :rtype: + """ + + message = json.dumps((json.dumps(rawMessage), pilotUUID)) + major, minor, micro, _, _ = sys.version_info + if major >= 3: + data = urlencode({"method": "sendMessage", "args": message}).encode("utf-8") # encode to bytes ! for python3 + else: + data = urlencode({"method": "sendMessage", "args": message}) + caPath = os.getenv("X509_CERT_DIR") + cert = os.getenv("X509_USER_PROXY") + + context = ssl.create_default_context() + context.load_verify_locations(capath=caPath) + context.load_cert_chain(cert) + res = urlopen(url, data, context=context) + res.close() class CommandBase(object): """CommandBase is the base class for every command in the pilot commands toolbox""" def __init__(self, pilotParams, dummy=""): - """c'tor + """ + Defines the classic pilot logger and the pilot parameters. + Debug level of the Logger is controlled by the -d flag in pilotParams. - Defines the logger and the pilot parameters + :param pilotParams: a dictionary of pilot parameters. + :type pilotParams: dict + :param dummy: """ + self.pp = pilotParams - self.log = ExtendedLogger( - name=self.__class__.__name__, - debugFlag=False, - pilotOutput="pilot.out", - isPilotLoggerOn=self.pp.pilotLogging, - setup=self.pp.setup, - ) - # self.log = Logger( self.__class__.__name__ ) - self.debugFlag = False - for o, _ in self.pp.optList: - if o == "-d" or o == "--debug": - self.log.setDebug() - self.debugFlag = True - self.log.debug("\n\n Initialized command %s" % self.__class__) + isPilotLoggerOn = pilotParams.pilotLogging + self.debugFlag = pilotParams.debugFlag + loggerURL = pilotParams.loggerURL + + if loggerURL is None: + self.log = Logger(self.__class__.__name__, debugFlag=self.debugFlag) + else: + # remote logger + self.log = RemoteLogger( + loggerURL, self.__class__.__name__, pilotUUID=pilotParams.pilotUUID, debugFlag=self.debugFlag + ) + + self.log.isPilotLoggerOn = isPilotLoggerOn + if self.debugFlag: + self.log.setDebug() + self.log.debug("Initialized command %s" % self.__class__.__name__) + self.log.debug("pilotParams option list: %s" % self.pp.optList) self.cfgOptionDIRACVersion = self._getCFGOptionDIRACVersion() def _getCFGOptionDIRACVersion(self): @@ -401,7 +539,7 @@ def ascii_filter(in_chr): if ord(in_chr) < 128: return in_chr else: - return '' + return "" outData = "" isRunning = True @@ -413,7 +551,7 @@ def ascii_filter(in_chr): for stream in readfd: # ignore codepoint splitting problems; not worth it outChunk = stream.read(1024).decode("ascii", "replace") - outChunk = ''.join(filter(ascii_filter, outChunk)) + outChunk = "".join(filter(ascii_filter, outChunk)) if not outChunk: # file has reached EOF, program finished isRunning = False @@ -426,11 +564,17 @@ def ascii_filter(in_chr): else: sys.stdout.write(outChunk) sys.stdout.flush() + # add outChunk to an existing buffer of the remote logger, if enabled. + if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn: + self.log.buffer.write(outChunk) outData += outChunk # Ensure output ends on a newline sys.stdout.write("\n") sys.stdout.flush() + if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn: + if not self.log.buffer.getValue().endswith("\n"): + self.log.buffer.write("\n") sys.stderr.write("\n") sys.stderr.flush() @@ -560,6 +704,8 @@ def __init__(self): self.certsLocation = "%s/etc/grid-security" % self.workingDir self.pilotCFGFile = "pilot.json" self.pilotLogging = False + self.loggerURL = None + self.pilotUUID = "unknown" self.modules = "" # see dirac-install "-m" option documentation self.userEnvVariables = "" # see dirac-install "--userEnvVariables" option documentation self.pipInstallOptions = "" @@ -581,6 +727,7 @@ def __init__(self): ("c", "cert", "Use server certificate instead of proxy"), ("d", "debug", "Set debug flag"), ("e:", "extraPackages=", "Extra packages to install (comma separated)"), + ("g:", "loggerURL=", "Remote Logger service URL"), ("h", "help", "Show this help"), ("k", "keepPP", "Do not clear PYTHONPATH on start"), ("l:", "project=", "Project to install"), @@ -625,6 +772,7 @@ def __init__(self): ("Z:", "commandOptions=", "Options parsed by command modules"), ("", "pythonVersion=", "Python version of DIRAC client to install"), ("", "defaultsURL=", "user-defined URL for global config"), + ("", "pilotUUID=", "pilot UUID"), ) # Possibly get Setup and JSON URL/filename from command line @@ -739,6 +887,10 @@ def __initCommandLine2(self): pass elif o == "-z" or o == "--pilotLogging": self.pilotLogging = True + elif o == "-g" or o == "--loggerURL": + self.loggerURL = v + elif o == "--pilotUUID": + self.pilotUUID = v elif o in ("-o", "--option"): self.genericOption = v elif o in ("-t", "--tag"): diff --git a/Pilot/tests/Test_MessageSender.py b/Pilot/tests/Test_MessageSender.py deleted file mode 100644 index d9a58f87..00000000 --- a/Pilot/tests/Test_MessageSender.py +++ /dev/null @@ -1,144 +0,0 @@ -""" Unit tests for MessageSender -""" - -from __future__ import absolute_import, division, print_function - -# pylint: disable=protected-access, missing-docstring, invalid-name, line-too-long - -import sys -import unittest -import os -from mock import MagicMock -from Pilot.MessageSender import LocalFileSender, StompSender, RESTSender, eraseFileContent, loadAndCreateObject -import Pilot.MessageSender as module - -############################ -# python 2 -> 3 "hacks" -try: - ModuleNotFoundError -except NameError: - ModuleNotFoundError = ImportError -############################ - - -def removeFile(filename): - try: - os.remove(filename) - except OSError: - pass - - -class TestMessageSenderEraseFileContent(unittest.TestCase): - def setUp(self): - self.testFile = "someStrangeFile" - with open(self.testFile, "a"): - os.utime(self.testFile, None) - - def tearDown(self): - removeFile(self.testFile) - - def test_success(self): - try: - eraseFileContent(self.testFile) - except BaseException: - self.fail("eraseFileContent() raised ExceptionType!") - - -class TestLoadAndCreateObject(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - - def test_success(self): - res = loadAndCreateObject("Pilot.MessageSender", "LocalFileSender", {"LocalOutputFile": "blabla"}) - self.assertTrue(res) - - def test_fail(self): - self.assertRaises(ModuleNotFoundError, loadAndCreateObject, "Bla.Bla", "NonExistingClass", "") - - -class TestLocalFileSender(unittest.TestCase): - def setUp(self): - self.testFile = "someLocalQueueOfMessages" - self.testMessage = "my test message" - removeFile(self.testFile) - - def tearDown(self): - removeFile(self.testFile) - - def test_success(self): - msgSender = LocalFileSender({"LocalOutputFile": self.testFile}) - res = msgSender.sendMessage(self.testMessage, "info") - self.assertTrue(res) - lineFromFile = "" - with open(self.testFile, "r") as myFile: - lineFromFile = next(myFile) - self.assertEqual(self.testMessage + "\n", lineFromFile) - - def test_failure_badParams(self): - self.assertRaises(ValueError, LocalFileSender, {"blabl": "bleble"}) - - -class TestStompSender(unittest.TestCase): - def setUp(self): - self.testFile = "myFile" - self.testMessage = "my test message" - module.stomp = MagicMock() - module.stomp.Connection = MagicMock() - connectionMock = MagicMock() - connectionMock.is_connected.return_value = True - module.stomp.Connection.return_value = connectionMock - - def tearDown(self): - removeFile(self.testFile) - - def test_success(self): - params = { - "HostKey": "key", - "HostCertificate": "cert", - "CACertificate": "caCert", - "Host": "test.host.ch", - "Port": "666", - "QueuePath": "/queue/myqueue", - "LocalOutputFile": self.testFile, - } - msgSender = StompSender(params) - res = msgSender.sendMessage(self.testMessage, "info") - self.assertTrue(res) - - def test_failure_badParams(self): - self.assertRaises(ValueError, StompSender, {"blabl": "bleble"}) - - -class TestRESTSender(unittest.TestCase): - def setUp(self): - self.testFile = "myFile" - self.testMessage = "my test message" - module.requests = MagicMock() - module.requests.post = MagicMock() - - def test_success(self): - params = { - "HostKey": "key", - "HostCertificate": "cert", - "CACertificate": "caCert", - "Url": "https://some.host.ch/messages", - "LocalOutputFile": self.testFile, - } - msgSender = RESTSender(params) - res = msgSender.sendMessage(self.testMessage, "info") - self.assertTrue(res) - - def test_failure_badParams(self): - self.assertRaises(ValueError, RESTSender, {"blabl": "bleble"}) - - -if __name__ == "__main__": - suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestLocalFileSender) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestStompSender)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestRESTSender)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestMessageSenderEraseFileContent)) - testResult = unittest.TextTestRunner(verbosity=2).run(suite) - sys.exit(not testResult.wasSuccessful()) diff --git a/Pilot/tests/Test_PilotLogger.py b/Pilot/tests/Test_PilotLogger.py deleted file mode 100644 index 24c4cda8..00000000 --- a/Pilot/tests/Test_PilotLogger.py +++ /dev/null @@ -1,114 +0,0 @@ -""" Unit tests for PilotLogger -""" - -from __future__ import absolute_import, division, print_function - -# pylint: disable=protected-access, missing-docstring, invalid-name, line-too-long - -import sys -import unittest -import os -from Pilot.PilotLogger import PilotLogger, getPilotUUIDFromFile, addMissingConfiguration -from Pilot.PilotLoggerTools import getUniqueIDAndSaveToFile - - -class TestGetPilotUUIDFromFile(unittest.TestCase): - def setUp(self): - self.testFile = "UUID_to_store" - getUniqueIDAndSaveToFile(self.testFile) - self.badFile = "////" - self.nonExistentFile = "abrakadabraToCzaryIMagia" - - def tearDown(self): - try: - os.remove(self.testFile) - except OSError: - pass - - def test_success(self): - uuid = getPilotUUIDFromFile(self.testFile) - self.assertTrue(uuid) - - def test_failureBadFile(self): - uuid = getPilotUUIDFromFile(self.badFile) - self.assertFalse(uuid) - - def test_failureNonExistent(self): - uuid = getPilotUUIDFromFile(self.nonExistentFile) - self.assertFalse(uuid) - - -class TestPilotLogger_isCorrectStatus(unittest.TestCase): - def setUp(self): - self.uuidFile = "PilotUUID" - self.logger = PilotLogger() - - def tearDown(self): - try: - os.remove(self.uuidFile) - except OSError: - pass - - def test_success(self): - for status in self.logger.STATUSES: - self.assertTrue(self.logger._isCorrectStatus(status)) - - def test_failure(self): - self.assertFalse(self.logger._isCorrectStatus("mamma Mia")) - - def test_failureEmpty(self): - self.assertFalse(self.logger._isCorrectStatus("")) - - -class TestPilotLogger_init(unittest.TestCase): - def setUp(self): - self.uuidFile = "PilotUUID" - - def tearDown(self): - try: - os.remove(self.uuidFile) - except OSError: - pass - - def test_DefaultCtrNonJsonFile(self): - logger = PilotLogger() - self.assertEqual(logger.params["LoggingType"], "LOCAL_FILE") - self.assertEqual(logger.params["LocalOutputFile"], "myLocalQueueOfMessages") - self.assertEqual(logger.params["FileWithID"], "PilotUUID") - - -class TestPilotLogger_addMissingConfiguration(unittest.TestCase): - def setUp(self): - self.uuidFile = "PilotUUID" - - def tearDown(self): - try: - os.remove(self.uuidFile) - except OSError: - pass - - def test_success(self): - config = {"LoggingType": "MQ", "LocalOutputFile": "blabla", "FileWithID": "myUUUID"} - res = addMissingConfiguration(config) - self.assertEqual(res, config) - - def test_emptyConfig(self): - self.assertEqual( - addMissingConfiguration(None), - {"LoggingType": "LOCAL_FILE", "LocalOutputFile": "myLocalQueueOfMessages", "FileWithID": "PilotUUID"}, - ) - - -class TestPilotLogger_sendMessage(unittest.TestCase): - pass - # here some mocks needed - - -if __name__ == "__main__": - suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestGetPilotUUIDFromFile) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLogger_isCorrectStatus)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLogger_init)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLogger_addMissingConfiguration)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLogger_sendMessage)) - testResult = unittest.TextTestRunner(verbosity=2).run(suite) - sys.exit(not testResult.wasSuccessful()) diff --git a/Pilot/tests/Test_PilotLoggerTools.py b/Pilot/tests/Test_PilotLoggerTools.py deleted file mode 100644 index 92c9e9b7..00000000 --- a/Pilot/tests/Test_PilotLoggerTools.py +++ /dev/null @@ -1,325 +0,0 @@ -"""Unit tests for PilotLoggerTools -""" - -from __future__ import absolute_import, division, print_function - -# pylint: disable=protected-access, missing-docstring, invalid-name, line-too-long - -import sys -import json -import os -import unittest -import mock -from Pilot.PilotLoggerTools import generateDict, encodeMessage -from Pilot.PilotLoggerTools import decodeMessage, isMessageFormatCorrect -from Pilot.PilotLoggerTools import getUniqueIDAndSaveToFile -from Pilot.PilotLoggerTools import createPilotLoggerConfigFile -from Pilot.PilotLoggerTools import getUniqueIDFromOS -from Pilot.PilotLoggerTools import readPilotJSONConfigFile - - -class TestPilotLoggerTools(unittest.TestCase): - def setUp(self): - self.msg = { - "status": "info", - "phase": "Installing", - "timestamp": "1427121370.7", - "messageContent": "Uname = Linux localhost 3.10.64-85.cernvm.x86_64", - "pilotUUID": "eda78924-d169-11e4-bfd2-0800275d1a0a", - "source": "InstallDIRAC", - } - self.testFile = "test_file_to_remove" - self.testFileCfg = "TestConf.json" - self.badFile = "////" - - def tearDown(self): - # for fileProd in [self.testFile, self.testFileCfg, 'PilotUUID']: - for fileProd in [self.testFile, "PilotUUID"]: - try: - os.remove(fileProd) - except OSError: - pass - - -class TestPilotLoggerToolsreadPilotJSONConfigFile(TestPilotLoggerTools): - def setUp(self): - jsonContent_MQ = """ - { - "Setups": { - "DIRAC-Certification": { - "Logging": { - "Queue": { - "test": { - "Persistent": "False", - "Acknowledgement": "False" - } - }, - "LoggingType":"MQ", - "LocalOutputFile":"myLocalQueueOfMessages", - "Host": "testMachineMQ.cern.ch", - "Port": "61614", - "HostKey": "/path/to/certs/hostkey.pem", - "HostCertificate": "/path/to/certs/hostcert.pem", - "CACertificate": "/path/to/certs/ca-bundle.crt" - } - } - }, - "DefaultSetup": "DIRAC-Certification" - } - """ - self.pilotJSON_MQ = "pilotMQ.json" - with open(self.pilotJSON_MQ, "w") as myF: - myF.write(jsonContent_MQ) - - jsonContent_REST = """ - { - "Setups": { - "DIRAC-Certification": { - "Logging": { - "LoggingType":"REST_API", - "LocalOutputFile":"myLocalQueueOfMessages", - "Url": "https://testMachineREST.cern.ch:666/msg", - "HostKey": "/path/to/certs/hostkey.pem", - "HostCertificate": "/path/to/certs/hostcert.pem", - "CACertificate": "/path/to/certs/ca-bundle.crt" - } - } - }, - "DefaultSetup": "DIRAC-Certification" - } - """ - self.pilotJSON_REST = "pilotREST.json" - with open(self.pilotJSON_REST, "w") as myF: - myF.write(jsonContent_REST) - - jsonContent_LOCAL = """ - { - "Setups": { - "DIRAC-Certification": { - "Logging": { - "LoggingType":"LOCAL_FILE", - "LocalOutputFile":"myLocalQueueOfMessages" - } - } - }, - "DefaultSetup": "DIRAC-Certification" - } - """ - self.pilotJSON_LOCAL = "pilotLOCAL.json" - with open(self.pilotJSON_LOCAL, "w") as myF: - myF.write(jsonContent_LOCAL) - - def tearDown(self): - for fileProd in [self.pilotJSON_MQ, self.pilotJSON_REST, self.pilotJSON_LOCAL]: - try: - os.remove(fileProd) - except OSError: - pass - - def test_success_MQ(self): - config = readPilotJSONConfigFile(self.pilotJSON_MQ) - host = "testMachineMQ.cern.ch" - port = 61614 - queuePath = "/queue/test" - key_file = "/path/to/certs/hostkey.pem" - cert_file = "/path/to/certs/hostcert.pem" - ca_certs = "/path/to/certs/ca-bundle.crt" - config = readPilotJSONConfigFile(self.pilotJSON_MQ) - - self.assertEqual(config["LoggingType"], "MQ") - self.assertEqual(config["LocalOutputFile"], "myLocalQueueOfMessages") - self.assertEqual(int(config["Port"]), port) - self.assertEqual(config["Host"], host) - self.assertEqual(config["QueuePath"], queuePath) - self.assertEqual(config["HostKey"], key_file) - self.assertEqual(config["HostCertificate"], cert_file) - self.assertEqual(config["CACertificate"], ca_certs) - self.assertEqual(config["FileWithID"], "PilotUUID") - - def test_success_REST(self): - config = readPilotJSONConfigFile(self.pilotJSON_REST) - url = "https://testMachineREST.cern.ch:666/msg" - key_file = "/path/to/certs/hostkey.pem" - cert_file = "/path/to/certs/hostcert.pem" - ca_certs = "/path/to/certs/ca-bundle.crt" - config = readPilotJSONConfigFile(self.pilotJSON_REST) - self.assertEqual(config["LoggingType"], "REST_API") - self.assertEqual(config["LocalOutputFile"], "myLocalQueueOfMessages") - self.assertEqual(config["Url"], url) - self.assertEqual(config["HostKey"], key_file) - self.assertEqual(config["HostCertificate"], cert_file) - self.assertEqual(config["CACertificate"], ca_certs) - self.assertEqual(config["FileWithID"], "PilotUUID") - - self.assertFalse(config["QueuePath"]) - - def test_success_LOCAL(self): - config = readPilotJSONConfigFile(self.pilotJSON_LOCAL) - self.assertEqual(config["LoggingType"], "LOCAL_FILE") - self.assertEqual(config["LocalOutputFile"], "myLocalQueueOfMessages") - self.assertEqual(config["FileWithID"], "PilotUUID") - - self.assertFalse(config["QueuePath"]) - self.assertFalse(config["Port"]) - self.assertFalse(config["Host"]) - self.assertFalse(config["HostKey"]) - self.assertFalse(config["HostCertificate"]) - self.assertFalse(config["CACertificate"]) - - def test_failure(self): - pass - - -class TestPilotLoggerToolsCreatePilotLoggerConfigFile(TestPilotLoggerTools): - def test_success(self): - loggingType = "MQ" - host = "127.0.0.1" - port = "61614" - url = "" - key_file = "certificates/client/key.pem" - cert_file = "certificates/client/cert.pem" - ca_certs = "certificates/testca/cacert.pem" - fileWithID = "PilotUUID_test" - queue = {"test.cern.ch": {}} - - createPilotLoggerConfigFile( - filename=self.testFileCfg, - loggingType=loggingType, - host=host, - port=port, - url=url, - key_file=key_file, - cert_file=cert_file, - ca_certs=ca_certs, - fileWithID=fileWithID, - queue=queue, - ) - with open(self.testFileCfg, "r") as myFile: - config = myFile.read() - config = json.loads(config) - partial = config["Setups"]["DIRAC-Certification"]["Logging"] - self.assertEqual(partial["LoggingType"], "MQ") - self.assertEqual(partial["Port"], port) - self.assertEqual(partial["Host"], host) - self.assertEqual(partial["Url"], url) - self.assertEqual(partial["HostKey"], key_file) - self.assertEqual(partial["HostCertificate"], cert_file) - self.assertEqual(partial["CACertificate"], ca_certs) - self.assertEqual(partial["FileWithID"], fileWithID) - self.assertEqual(partial["Queue"], queue) - - def test_failure(self): - pass - - -class TestPilotLoggerToolsGenerateDict(TestPilotLoggerTools): - def test_success(self): - result = generateDict( - pilotUUID="eda78924-d169-11e4-bfd2-0800275d1a0a", - timestamp="1427121370.7", - source="InstallDIRAC", - phase="Installing", - status="info", - messageContent="Uname = Linux localhost 3.10.64-85.cernvm.x86_64", - ) - - self.assertEqual(result, self.msg) - - def test_failure(self): - result = generateDict( - "eda78924-d169-11e4-bfd2-0800275d1a0a", - "1427121370.7", - "InstallDIRAC", - "AAA Installation", - "info", - "Uname = Linux localhost 3.10.64-85.cernvm.x86_64", - ) - self.assertNotEqual(result, self.msg) - - -class TestPilotLoggerToolsEncodeMessage(TestPilotLoggerTools): - def test_success(self): - result = encodeMessage(self.msg) - standJSON = json.dumps(self.msg) - - self.assertEqual(result, standJSON) - - def test_failure(self): - pass - - -class TestPilotLoggerToolsDecodeMessage(TestPilotLoggerTools): - def test_success(self): - standJSON = json.dumps(self.msg) - result = decodeMessage(standJSON) - self.assertEqual(result, self.msg) - - def test_cosistency(self): - result = decodeMessage(encodeMessage(self.msg)) - self.assertEqual(result, self.msg) - - def test_fail(self): - self.assertRaises(TypeError, decodeMessage, self.msg) - - -class TestPilotLoggerIsMessageFormatCorrect(TestPilotLoggerTools): - def test_success(self): - self.assertTrue(isMessageFormatCorrect(self.msg)) - - def test_notDict(self): - self.assertFalse(isMessageFormatCorrect(["a", 2])) - - def test_missingKey(self): - badDict = self.msg.copy() - badDict.pop("source", None) # removing one key - self.assertFalse(isMessageFormatCorrect(badDict)) - - def test_valuesNotStrings(self): - badDict = self.msg.copy() - badDict["source"] = 10 - self.assertFalse(isMessageFormatCorrect(badDict)) - - def test_someValuesAreEmpty(self): - badDict = self.msg.copy() - badDict["timestamp"] = "" - self.assertFalse(isMessageFormatCorrect(badDict)) - - -class TestPilotLoggerGetUniqueIDAndSaveToFile(TestPilotLoggerTools): - def test_success(self): - self.assertTrue(getUniqueIDAndSaveToFile(self.testFile)) - - def test_fail(self): - self.assertFalse(getUniqueIDAndSaveToFile(self.badFile)) - - -class TestPilotLoggerGetUniqueIDFromOS(TestPilotLoggerTools): - def test_successCREAM(self): - with mock.patch.dict(os.environ, {"CREAM_JOBID": "CREAM_uuid"}): - self.assertEqual(getUniqueIDFromOS(), "CREAM_uuid") - - def test_successGRID(self): - with mock.patch.dict(os.environ, {"GRID_GLOBAL_JOBID": "GRID_uuid"}): - self.assertEqual(getUniqueIDFromOS(), "GRID_uuid") - - def test_successVM(self): - with mock.patch.dict(os.environ, {"VM_UUID": "VM_uuid", "CE_NAME": "myCE", "VMTYPE": "myVMTYPE"}): - self.assertEqual(getUniqueIDFromOS(), "vm://myCE/myCE:myVMTYPE:VM_uuid") - - def test_failVM(self): - self.assertFalse(getUniqueIDFromOS()) - - -if __name__ == "__main__": - suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLoggerTools) - - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLoggerToolsreadPilotJSONConfigFile)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLoggerToolsCreatePilotLoggerConfigFile)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLoggerToolsGenerateDict)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLoggerToolsEncodeMessage)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLoggerToolsDecodeMessage)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLoggerIsMessageFormatCorrect)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLoggerGetUniqueIDAndSaveToFile)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestPilotLoggerGetUniqueIDFromOS)) - testResult = unittest.TextTestRunner(verbosity=2).run(suite) - sys.exit(not testResult.wasSuccessful()) diff --git a/Pilot/tests/Test_pilotTools.py b/Pilot/tests/Test_pilotTools.py deleted file mode 100644 index be43b959..00000000 --- a/Pilot/tests/Test_pilotTools.py +++ /dev/null @@ -1,131 +0,0 @@ -"""Unit tests for pilotTools -""" - -from __future__ import absolute_import, division, print_function - -# pylint: skip-file - -import os -import json - -import pytest -from mock import MagicMock - -############################ -# python 2 -> 3 "hacks" -try: - import Queue as queue -except ImportError: - import queue -############################ - -from Pilot.pilotTools import ExtendedLogger, parseVersion - - -testOutputFile = "fakeQueueFile" - - -@pytest.fixture -def rmFiles(): - yield - for fr in ["PilotUUID", "PilotAgentUUID", "myLocalQueueOfMessages"]: - try: - os.remove(fr) - except OSError: - pass - - -@pytest.mark.parametrize( - "releaseVersion, useLegacyStyle, expected", - [ - ("invalid-version", True, "invalid-version"), - ("invalid-version", False, "invalid-version"), - # starting from legacy style - ("v10r2-pre9", True, "v10r2-pre9"), - ("v10r2-pre9", False, "10.2.0a9"), - ("v10r2", True, "v10r2"), - ("v10r2", False, "10.2.0"), - ("v10r2p0", True, "v10r2"), - ("v10r2p0", False, "10.2.0"), - ("v10r2p1", True, "v10r2p1"), - ("v10r2p1", False, "10.2.1"), - ("v10r2p10", True, "v10r2p10"), - ("v10r2p10", False, "10.2.10"), - ("v10r2p15", True, "v10r2p15"), - ("v10r2p15", False, "10.2.15"), - ("v10r3", True, "v10r3"), - ("v10r3", False, "10.3.0"), - ("v11r0-pre1", True, "v11r0-pre1"), - ("v11r0-pre1", False, "11.0.0a1"), - ("v11r0-pre12", True, "v11r0-pre12"), - ("v11r0-pre12", False, "11.0.0a12"), - ("v11r0", True, "v11r0"), - ("v11r0", False, "11.0.0"), - ("v11r1", True, "v11r1"), - ("v11r1", False, "11.1.0"), - # starting from PEP440 style - ("7.3.0a17", True, "v7r3-pre17"), - ("7.3.0a17", False, "7.3.0a17"), - ("7.3", True, "v7r3"), - ("7.3", False, "7.3.0"), - ("7.3.0", True, "v7r3"), - ("7.3.0", False, "7.3.0"), - ("7.3.1", True, "v7r3p1"), - ("7.3.1", False, "7.3.1"), - ("7.3.15", True, "v7r3p15"), - ("7.3.15", False, "7.3.15"), - ("7.13.15", True, "v7r13p15"), - ("7.13.15", False, "7.13.15"), - ("7.0.0", True, "v7r0"), - ("7.0.0", False, "7.0.0"), - ("7.0.0a1", True, "v7r0-pre1"), - ("7.0.0a1", False, "7.0.0a1"), - ], -) -def test_version_conversion(releaseVersion, useLegacyStyle, expected): - assert parseVersion(releaseVersion, useLegacyStyle) == expected - - -def readMessagesFromFileQueue(filename): - rqueue = queue.Queue() - with open(filename, "r") as myFile: - for line in myFile: - rqueue.put(line) - return rqueue - - -def dictWithoutKey(d, keyToRemove): - new_d = d.copy() - new_d.pop(keyToRemove, None) - return new_d - - -def removeTimeStampAndPilotUUID(message): - msg_result = dictWithoutKey(message, "timestamp") - return dictWithoutKey(msg_result, "pilotUUID") - - -# FIXME: this fails -def fixme_test_sendMessageToLocalFile(mocker, rmFiles): - mocker.patch("stomp.Connection", new=MagicMock()) - - msg_pattern = { - "status": "error", - "phase": "testing", - "messageContent": "test message", - "pilotUUID": "eda78924-d169-11e4-bfd2-0800275d1a0a", - "source": "testSource", - } - - with open(testOutputFile, "w"): - pass - - logger = ExtendedLogger(name="Pilot", debugFlag=True, pilotOutput="pilot.out", isPilotLoggerOn=True) - - logger.sendMessage(msg="test message", source="testSource", phase="testing", status="error", sendPilotLog=True) - rqueue = readMessagesFromFileQueue(testOutputFile) - - msg_result = json.loads(rqueue.get(block=False)) - msg_result = removeTimeStampAndPilotUUID(msg_result) - expected_msg = removeTimeStampAndPilotUUID(msg_pattern) - assert expected_msg == msg_result diff --git a/tests/CI/Test_simplePilotLogger.py b/tests/CI/Test_simplePilotLogger.py index dbd432c5..69ba0159 100644 --- a/tests/CI/Test_simplePilotLogger.py +++ b/tests/CI/Test_simplePilotLogger.py @@ -2,41 +2,105 @@ from __future__ import absolute_import, division, print_function -import consumeFromQueue import os -import json +import sys +import tempfile +import string +import random +import subprocess + +try: + from Pilot.pilotTools import CommandBase, PilotParams +except ImportError: + from pilotTools import CommandBase, PilotParams + import unittest +try: + from unittest.mock import patch +except ImportError: + from mock import patch -def dictWithoutKey(d, keyToRemove): - new_d = d.copy() - new_d.pop(keyToRemove, None) - return new_d +class TestPilotParams(unittest.TestCase): + @patch("sys.argv") + def test_pilotParamsInit(self, argvmock): + argvmock.__getitem__.return_value = [ + "-z", + "-d", + "-g", + "dummyURL", + "-F", + "tests/CI/pilot.json", + ] -class TestSimplePilotLogger(unittest.TestCase): - def test_SimplePilotLogger(self): - uuid = "37356d94-15c6-11e6-a600-606c663dde16" - filenameUUID = "PilotAgentUUID" - expectedMsgs = [ - { - u"status": u"Landed", - u"source": u"pilot", - u"pilotUUID": u"37356d94-15c6-11e6-a600-606c663dde16", - u"minorStatus": u"I will send an SOS to the world!", - } + pp = PilotParams() + + argvmock.__getitem__.assert_called() + self.assertEqual(argvmock.__getitem__.call_count, 3) + self.assertTrue(pp.pilotLogging) + self.assertEqual(pp.loggerURL, "dummyURL") + self.assertTrue(pp.debugFlag) + + +class TestCommandBase(unittest.TestCase): + def setUp(self): + # These temporary files, opened in text mode, will act as standard stream pipes for `Popen` + if sys.version_info.major == 3: + self.stdout_mock = tempfile.NamedTemporaryFile(mode="rb+", delete=False) + self.stderr_mock = tempfile.NamedTemporaryFile(mode="rb+", delete=False) + else: + self.stdout_mock = tempfile.NamedTemporaryFile(mode="r+", delete=False) + self.stderr_mock = tempfile.NamedTemporaryFile(mode="r+", delete=False) + + def tearDown(self): + # At the end of the test, we'll close and remove the created files + self.stdout_mock.close() + os.remove(self.stdout_mock.name) + os.remove(self.stderr_mock.name) + + @patch(("sys.argv")) + @patch("subprocess.Popen") + def test_executeAndGetOutput(self, popenMock, argvmock): + argvmock.__getitem__.return_value = [ + "-d", + "-g", + "dummyURL", + "-F", + "tests/CI/pilot.json", ] - with open(filenameUUID, "w") as myFile: - myFile.write(uuid) + for size in [1000, 1024, 1025, 2005]: + random_str = "".join( + random.choice(string.ascii_letters + "\n") for i in range(size) + ) + if sys.version_info.major == 3: + random_bytes = random_str.encode("UTF-8") + self.stdout_mock.write(random_bytes) + else: + self.stdout_mock.write(random_str) + self.stdout_mock.seek(0) + if sys.version_info.major == 3: + self.stderr_mock.write("Errare humanum est!".encode("UTF-8")) + else: + self.stderr_mock.write("Errare humanum est!") + self.stderr_mock.seek(0) + pp = PilotParams() + cBase = CommandBase(pp) + popenMock.return_value.stdout = self.stdout_mock + popenMock.return_value.stderr = self.stderr_mock + outData = cBase.executeAndGetOutput("dummy") + popenMock.assert_called() + self.assertEqual(outData[1], random_str) + self.stdout_mock.seek(0) + self.stderr_mock.seek(0) + self.stdout_mock.truncate() + self.stderr_mock.truncate() + - os.system('python PilotLogger.py "I will send an SOS to the world!"') - recvMsgs = consumeFromQueue.consume() - recvMsgs = [json.loads(x) for x in recvMsgs] - # we get rid of the timestamp field - expectedMsgs = [dictWithoutKey(x, "timestamp") for x in expectedMsgs] - recvMsgs = [dictWithoutKey(x, "timestamp") for x in recvMsgs] - self.assertEqual(expectedMsgs, recvMsgs) +class TestSimplePilotLogger(unittest.TestCase): + def test_SimplePilotLogger(self): + uuid = "37356d94-15c6-11e6-a600-606c663dde16" if __name__ == "__main__": diff --git a/tests/CI/consumeFromQueue.py b/tests/CI/consumeFromQueue.py deleted file mode 100644 index 4d28b58f..00000000 --- a/tests/CI/consumeFromQueue.py +++ /dev/null @@ -1,43 +0,0 @@ -from __future__ import absolute_import, division, print_function - -import stomp - - -class MyListener(stomp.ConnectionListener): - def __init__(self): - self.messages = [] - - def on_error(self, headers, message): - print('received an error "%s"' % message) - - def on_message(self, headers, message): - self.messages.append(message) - print(message) - - -def main(): - consume() - - -def consume(): - """ - Returns: - Queue: - """ - host_port = [("128.142.242.99", int(61614))] - key_file = "certificates/client/key.pem" - cert_file = "certificates/client/cert.pem" - ca_certs = "certificates/testca/cacert.pem" - conn = stomp.Connection(host_and_ports=host_port, use_ssl=True) - conn.set_ssl(for_hosts=host_port, key_file=key_file, cert_file=cert_file, ca_certs=ca_certs) - listener = MyListener() - conn.set_listener("", listener) - conn.start() # pylint: disable=no-member - conn.connect(wait=True) - conn.subscribe(destination="/queue/test", id=1, ack="auto") - conn.disconnect() - return listener.messages - - -if __name__ == "__main__": - main()