diff --git a/docs/source/AdministratorGuide/Systems/WorkloadManagement/Pilots/index.rst b/docs/source/AdministratorGuide/Systems/WorkloadManagement/Pilots/index.rst index a81f5a3f972..387fdbd1839 100644 --- a/docs/source/AdministratorGuide/Systems/WorkloadManagement/Pilots/index.rst +++ b/docs/source/AdministratorGuide/Systems/WorkloadManagement/Pilots/index.rst @@ -162,6 +162,21 @@ you should carefully read the RFC 18, and what follows. Pilot commands can be extended. A custom list of commands can be added starting the pilot with the -X option. +Pilots started when controlled by the SiteDirector +================================================== + +The :py:mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector` is a central component in DIRAC, +responsible for managing and optimizing the submission of pilot jobs to various computing resources. It features: + +- *Parallel Submission*: Capable of submitting pilot jobs in parallel across different Computing Elements (CEs) to enhance throughput. +- :py:mod:`~DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy`: It utilizes various submission policies to optimize pilot-job distribution: + - *AggressiveFilling*: Fills available slots regardless of waiting jobs, ideal for continuously busy sites. + - *WaitingSupportedJobs* (default one): Fills slots based on the number of waiting jobs, suitable for intermittently busy sites. +- *Monitoring and Accounting*: Features parallel monitoring and accounting for efficient tracking and management of pilot jobs. +- *Pilot Wrapping*: Creates pilot wrappers that facilitate the execution of pilot scripts in diverse environments, including Grid, cloud, and virtualized resources. +- *Resource Status Handling*: Integrates with the Resource Status System to ensure that pilots are only submitted to operational and enabled resources. + +The Site Director is controlled through different parameters set in the DIRAC configuration. More details in :py:mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector`. Pilots started when not controlled by the SiteDirector ====================================================== diff --git a/docs/source/UserGuide/WebPortalReference/PilotMonitor/index.rst b/docs/source/UserGuide/WebPortalReference/PilotMonitor/index.rst index 35cf86f8da1..a61627e05bd 100644 --- a/docs/source/UserGuide/WebPortalReference/PilotMonitor/index.rst +++ b/docs/source/UserGuide/WebPortalReference/PilotMonitor/index.rst @@ -90,9 +90,6 @@ The following columns are provided: **Benchmark** Estimation of the power of the Worker Node CPU which is running the Pilot Job. If 0, the estimation was not possible. -**TaskQueueID** - Internal DIRAC WMS identifier of the Task Queue for which the Pilot Job is sent. - **PilotID** Internal DIRAC WMS Pilot Job identifier diff --git a/src/DIRAC/ConfigurationSystem/Client/Helpers/Resources.py b/src/DIRAC/ConfigurationSystem/Client/Helpers/Resources.py index 983062fabf2..990e9009c94 100644 --- a/src/DIRAC/ConfigurationSystem/Client/Helpers/Resources.py +++ b/src/DIRAC/ConfigurationSystem/Client/Helpers/Resources.py @@ -252,10 +252,7 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags= for site in sites: if siteList and site not in siteList: continue - if community: - comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/VO", []) - if comList and community.lower() not in [cl.lower() for cl in comList]: - continue + siteCEParameters = {} result = gConfig.getOptionsDict(f"/Resources/Sites/{grid}/{site}/CEs") if result["OK"]: @@ -272,10 +269,7 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags= continue if ceList and ce not in ceList: continue - if community: - comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/VO", []) - if comList and community.lower() not in [cl.lower() for cl in comList]: - continue + ceOptionsDict = dict(siteCEParameters) result = gConfig.getOptionsDict(f"/Resources/Sites/{grid}/{site}/CEs/{ce}") if not result["OK"]: @@ -287,9 +281,23 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags= queues = result["Value"] for queue in queues: if community: - comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/VO", []) + # Community can be defined on site, CE or queue level + paths = [ + f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/VO", + f"/Resources/Sites/{grid}/{site}/CEs/{ce}/VO", + f"/Resources/Sites/{grid}/{site}/VO", + ] + + # Try each path in order, stopping when we find a non-empty list + for path in paths: + comList = gConfig.getValue(path, []) + if comList: + break + + # If we found a list and the community is not in it, skip this iteration if comList and community.lower() not in [cl.lower() for cl in comList]: continue + if tags: queueTags = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/Tag", []) queueTags = set(ceTags + queueTags) diff --git a/src/DIRAC/Interfaces/scripts/dirac_admin_get_job_pilots.py b/src/DIRAC/Interfaces/scripts/dirac_admin_get_job_pilots.py index cc6701d2af4..00b5304c43e 100755 --- a/src/DIRAC/Interfaces/scripts/dirac_admin_get_job_pilots.py +++ b/src/DIRAC/Interfaces/scripts/dirac_admin_get_job_pilots.py @@ -16,8 +16,7 @@ 'PilotJobReference': 'https://marlb.in2p3.fr:9000/biq6KT45Q', 'PilotStamp': '', 'Status': 'Done', - 'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52), - 'TaskQueueID': 399L}} + 'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52)}} """ from DIRAC.Core.Base.Script import Script diff --git a/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_info.py b/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_info.py index b3cbaa7cd5c..224606957fc 100755 --- a/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_info.py +++ b/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_info.py @@ -15,8 +15,7 @@ 'PilotJobReference': 'https://marlb.in2p3.fr:9000/2KHFrQjkw', 'PilotStamp': '', 'Status': 'Done', - 'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52), - 'TaskQueueID': 399L}} + 'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52)}} """ from DIRAC.Core.Base.Script import Script diff --git a/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py b/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py index 1aa3ac06e60..ff101940541 100644 --- a/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py +++ b/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py @@ -17,7 +17,7 @@ def __init__(self): super().__init__() - self.keyFields = ["TaskQueueID", "GridSite", "GridType", "Status"] + self.keyFields = ["GridSite", "GridType", "Status"] self.monitoringFields = ["NumOfPilots"] @@ -25,7 +25,6 @@ def __init__(self): self.addMapping( { - "TaskQueueID": {"type": "keyword"}, "GridSite": {"type": "keyword"}, "GridType": {"type": "keyword"}, "Status": {"type": "keyword"}, diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index e7afc2e5b4e..3d8622d5e66 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -92,7 +92,6 @@ def setToken(self, token): """Set the token and update the headers :param token: OAuth2Token object or dictionary containing token structure - :param int valid: validity period in seconds """ super().setToken(token) self.headers["Authorization"] = "Bearer " + self.token["access_token"] @@ -619,7 +618,11 @@ def getCEStatus(self): self.log.error("Failed getting the status of the CE.", result["Message"]) return S_ERROR("Failed getting the status of the CE") response = result["Value"] - ceData = response.json() + try: + ceData = response.json() + except requests.JSONDecodeError: + self.log.exception("Failed decoding the status of the CE") + return S_ERROR(f"Failed decoding the status of the CE") # Look only in the relevant section out of the headache queueInfo = ceData["Domains"]["AdminDomain"]["Services"]["ComputingService"]["ComputingShare"] diff --git a/src/DIRAC/Resources/Computing/CloudComputingElement.py b/src/DIRAC/Resources/Computing/CloudComputingElement.py index 0559b2ff95d..e98f9b5aec9 100644 --- a/src/DIRAC/Resources/Computing/CloudComputingElement.py +++ b/src/DIRAC/Resources/Computing/CloudComputingElement.py @@ -355,7 +355,7 @@ def _getMetadata(self, executableFile, pilotStamp=""): template = yaml.safe_load(template_fd) for filedef in template["write_files"]: if filedef["content"] == "PROXY_STR": - filedef["content"] = self.proxy + filedef["content"] = self.proxy.dumpAllToString()["Value"] elif filedef["content"] == "EXECUTABLE_STR": filedef["content"] = exe_str elif "STAMP_STR" in filedef["content"]: @@ -398,11 +398,7 @@ def _renewCloudProxy(self): if not res["OK"]: self.log.error("Could not download proxy", res["Message"]) return False - resdump = res["Value"].dumpAllToString() - if not resdump["OK"]: - self.log.error("Failed to dump proxy to string", resdump["Message"]) - return False - self.proxy = resdump["Value"] + self.proxy = res["Value"] self.valid = datetime.datetime.utcnow() + proxyLifetime * datetime.timedelta(seconds=1) return True diff --git a/src/DIRAC/Resources/Computing/ComputingElement.py b/src/DIRAC/Resources/Computing/ComputingElement.py index 96aa7705d69..219c0bd6bed 100755 --- a/src/DIRAC/Resources/Computing/ComputingElement.py +++ b/src/DIRAC/Resources/Computing/ComputingElement.py @@ -82,7 +82,6 @@ def __init__(self, ceName): self.minProxyTime = gConfig.getValue("/Registry/MinProxyLifeTime", 10800) # secs self.defaultProxyTime = gConfig.getValue("/Registry/DefaultProxyLifeTime", 43200) # secs self.proxyCheckPeriod = gConfig.getValue("/Registry/ProxyCheckingPeriod", 3600) # secs - self.valid = None self.batchSystem = None self.taskResults = {} @@ -97,10 +96,9 @@ def __init__(self, ceName): self.initializeParameters() self.log.debug("CE parameters", self.ceParameters) - def setProxy(self, proxy, valid=0): + def setProxy(self, proxy): """Set proxy for this instance""" self.proxy = proxy - self.valid = datetime.datetime.utcnow() + second * valid def setToken(self, token): self.token = token @@ -116,21 +114,6 @@ def _prepareProxy(self): self.log.debug(f"Set proxy variable X509_USER_PROXY to {os.environ['X509_USER_PROXY']}") return S_OK() - def isProxyValid(self, valid=1000): - """Check if the stored proxy is valid""" - if not self.valid: - result = S_ERROR("Proxy is not valid for the requested length") - result["Value"] = 0 - return result - delta = self.valid - datetime.datetime.utcnow() - totalSeconds = delta.days * 86400 + delta.seconds - if totalSeconds > valid: - return S_OK(totalSeconds - valid) - - result = S_ERROR("Proxy is not valid for the requested length") - result["Value"] = totalSeconds - valid - return result - def initializeParameters(self): """Initialize the CE parameters after they are collected from various sources""" @@ -258,72 +241,49 @@ def setCPUTimeLeft(self, cpuTimeLeft=None): return S_ERROR("Wrong type for setCPUTimeLeft argument") ############################################################################# - def available(self, jobIDList=None): + def available(self): """This method returns the number of available slots in the target CE. The CE instance polls for waiting and running jobs and compares to the limits in the CE parameters. - - :param list jobIDList: list of already existing job IDs to be checked against """ + result = self.getCEStatus() + if not result["OK"]: + return result - # If there are no already registered jobs - if jobIDList is not None and not jobIDList: - result = S_OK() - result["RunningJobs"] = 0 - result["WaitingJobs"] = 0 - result["SubmittedJobs"] = 0 - else: - result = self.getCEStatus() - if not result["OK"]: - return result runningJobs = result["RunningJobs"] waitingJobs = result["WaitingJobs"] - submittedJobs = result["SubmittedJobs"] availableProcessors = result.get("AvailableProcessors") ceInfoDict = dict(result) maxTotalJobs = int(self.ceParameters.get("MaxTotalJobs", 0)) ceInfoDict["MaxTotalJobs"] = maxTotalJobs - waitingToRunningRatio = float(self.ceParameters.get("WaitingToRunningRatio", 0.0)) - # if there are no Running job we can submit to get at most 'MaxWaitingJobs' - # if there are Running jobs we can increase this to get a ratio W / R 'WaitingToRunningRatio' - maxWaitingJobs = int(max(int(self.ceParameters.get("MaxWaitingJobs", 0)), runningJobs * waitingToRunningRatio)) + maxWaitingJobs = int(self.ceParameters.get("MaxWaitingJobs", 0)) + ceInfoDict["MaxWaitingJobs"] = maxWaitingJobs self.log.verbose("Max Number of Jobs:", maxTotalJobs) - self.log.verbose("Max W/R Ratio:", waitingToRunningRatio) self.log.verbose("Max Waiting Jobs:", maxWaitingJobs) - # Determine how many more jobs can be submitted - message = f"{self.ceName} CE: SubmittedJobs={submittedJobs}" - message += f", WaitingJobs={waitingJobs}, RunningJobs={runningJobs}" + result["CEInfoDict"] = ceInfoDict + # If we reached the maximum number of total jobs, then the CE is not available totalJobs = runningJobs + waitingJobs - - message += f", MaxTotalJobs={maxTotalJobs}" - if totalJobs >= maxTotalJobs: - self.log.verbose("Max Number of Jobs reached:", maxTotalJobs) + self.log.verbose("Max Number of Jobs reached:", f"{totalJobs} >= {maxTotalJobs}") result["Value"] = 0 - message = "There are {} waiting jobs and total jobs {} >= {} max total jobs".format( - waitingJobs, - totalJobs, - maxTotalJobs, - ) - else: - additionalJobs = 0 - if waitingJobs < maxWaitingJobs: - additionalJobs = maxWaitingJobs - waitingJobs - if totalJobs + additionalJobs >= maxTotalJobs: - additionalJobs = maxTotalJobs - totalJobs - # For SSH CE case - if int(self.ceParameters.get("MaxWaitingJobs", 0)) == 0: - additionalJobs = maxTotalJobs - runningJobs - - if availableProcessors is not None: - additionalJobs = min(additionalJobs, availableProcessors) - result["Value"] = additionalJobs - - result["Message"] = message - result["CEInfoDict"] = ceInfoDict + return result + + # If we reached the maximum number of waiting jobs, then the CE is not available + if waitingJobs >= maxWaitingJobs: + self.log.verbose("Max Number of waiting jobs reached:", f"{waitingJobs} >= {maxWaitingJobs}") + result["Value"] = 0 + return result + + additionalJobs = maxWaitingJobs - waitingJobs + if totalJobs + additionalJobs >= maxTotalJobs: + additionalJobs = maxTotalJobs - totalJobs + + if availableProcessors is not None: + additionalJobs = min(additionalJobs, availableProcessors) + result["Value"] = additionalJobs return result ############################################################################# diff --git a/src/DIRAC/Resources/Computing/SSHComputingElement.py b/src/DIRAC/Resources/Computing/SSHComputingElement.py index 605851fbfdf..c7ab66d7bd6 100644 --- a/src/DIRAC/Resources/Computing/SSHComputingElement.py +++ b/src/DIRAC/Resources/Computing/SSHComputingElement.py @@ -307,15 +307,14 @@ def __init__(self, ceUniqueID): self.errorTemplate = "" ############################################################################ - def setProxy(self, proxy, valid=0): + def setProxy(self, proxy): """ Set and prepare proxy to use :param str proxy: proxy to use - :param int valid: proxy validity period :return: S_OK/S_ERROR """ - ComputingElement.setProxy(self, proxy, valid) + ComputingElement.setProxy(self, proxy) if self.ceParameters.get("SSHType", "ssh") == "gsissh": result = self._prepareProxy() if not result["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 89ce0a1dc9d..f6bba0c77d2 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -324,8 +324,6 @@ def _buildQueueDict(self, siteNames, ces, ceTypes): result = getQueuesResolved( siteDict=result["Value"], queueCECache=self.queueCECache, - gridEnv=getGridEnv(), - setup=gConfig.getValue("/DIRAC/Setup", "unknown"), instantiateCEs=True, ) if not result["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index b6b0ea64237..8c9af36ca58 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -9,11 +9,9 @@ """ import datetime import os -import random -import socket -import sys from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any import DIRAC from DIRAC import S_ERROR, S_OK, gConfig @@ -24,8 +22,9 @@ ) from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals, Registry from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations -from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping +from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping, getQueues from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Security import X509Chain from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities.TimeUtilities import second, toEpochMilliSeconds from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager @@ -34,20 +33,18 @@ from DIRAC.Resources.Computing.ComputingElement import ComputingElement from DIRAC.ResourceStatusSystem.Client.ResourceStatus import ResourceStatus from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus +from DIRAC.Resources.Computing.ComputingElement import ComputingElement from DIRAC.WorkloadManagementSystem.Client import PilotStatus -from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB -from DIRAC.WorkloadManagementSystem.private.ConfigHelper import ( - findGenericPilotCredentials, -) -from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import getGridEnv +from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials from DIRAC.WorkloadManagementSystem.Utilities.PilotWrapper import ( _writePilotWrapperFile, getPilotFilesCompressedEncodedDict, pilotWrapperScript, ) from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved +from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import WAITING_SUPPORTED_JOBS MAX_PILOTS_TO_SUBMIT = 100 @@ -62,69 +59,36 @@ def __init__(self, *args, **kwargs): """c'tor""" super().__init__(*args, **kwargs) - # on-the fly imports - ol = ObjectLoader() - res = ol.loadModule("ConfigurationSystem.Client.Helpers.Resources") - if not res["OK"]: - sys.exit(res["Message"]) - self.resourcesModule = res["Value"] - self.queueDict = {} # self.queueCECache aims at saving CEs information over the cycles to avoid to create the exact same CEs each cycle self.queueCECache = {} - self.queueSlots = {} self.failedQueues = defaultdict(int) - # failedPilotOutput stores the number of times the Site Director failed to get a given pilot output - self.failedPilotOutput = defaultdict(int) - self.firstPass = True self.maxPilotsToSubmit = MAX_PILOTS_TO_SUBMIT - self.gridEnv = "" self.vo = "" self.pilotDN = "" self.pilotGroup = "" - self.platforms = [] - self.sites = [] - self.totalSubmittedPilots = 0 - - self.addPilotsToEmptySites = False - self.checkPlatform = False - self.updateStatus = True - self.getOutput = False + self.sendAccounting = True self.sendSubmissionAccounting = True self.sendSubmissionMonitoring = False + self.siteClient = None self.rssClient = None self.pilotAgentsDB = None self.rssFlag = None - self.globalParameters = {"NumberOfProcessors": 1, "MaxRAM": 2048} # self.failedQueueCycleFactor is the number of cycles a queue has to wait before getting pilots again self.failedQueueCycleFactor = 10 - # Every N cycles, the status of the pilots are updated by the SiteDirector - self.pilotStatusUpdateCycleFactor = 10 - # Every N cycles, the number of slots available in the queues is updated - self.availableSlotsUpdateCycleFactor = 10 - self.maxQueueLength = 86400 * 3 - # Maximum number of times the Site Director is going to try to get a pilot output before stopping - self.maxRetryGetPilotOutput = 3 - - self.pilotWaitingFlag = True - self.pilotLogLevel = "INFO" - self.matcherClient = None - self.siteMaskList = [] - self.ceMaskList = [] + self.submissionPolicyName = WAITING_SUPPORTED_JOBS + self.submissionPolicy = None - self.localhost = socket.getfqdn() + self.workingDirectory = None + self.maxQueueLength = 86400 * 3 def initialize(self): """Initial settings""" - self.gridEnv = self.am_getOption("GridEnv", "") - if not self.gridEnv: - self.gridEnv = getGridEnv() - # The SiteDirector is for a particular user community self.vo = self.am_getOption("VO", self.am_getOption("Community", "")) if not self.vo: @@ -135,11 +99,12 @@ def initialize(self): # Get the clients self.siteClient = SiteStatus() self.rssClient = ResourceStatus() - self.matcherClient = MatcherClient() self.pilotAgentsDB = getPilotAgentsDB() return S_OK() + ##################################################################################### + def beginExecution(self): """This is run at every cycle, as first thing. @@ -148,11 +113,8 @@ def beginExecution(self): 3. Get the site description dictionary 4. Get what to send in pilot wrapper """ - - self.rssFlag = self.rssClient.rssFlag - # Which credentials to use? - # are they specific to the SD? (if not, get the generic ones) + # Are they specific to the SD? (if not, get the generic ones) self.pilotDN = self.am_getOption("PilotDN", self.pilotDN) self.pilotGroup = self.am_getOption("PilotGroup", self.pilotGroup) result = findGenericPilotCredentials(vo=self.vo, pilotDN=self.pilotDN, pilotGroup=self.pilotGroup) @@ -161,26 +123,20 @@ def beginExecution(self): self.pilotDN, self.pilotGroup = result["Value"] # Parameters - self.workingDirectory = self.am_getOption("WorkDirectory") + self.workingDirectory = self.am_getOption("WorkDirectory", self.workingDirectory) self.maxQueueLength = self.am_getOption("MaxQueueLength", self.maxQueueLength) - self.pilotLogLevel = self.am_getOption("PilotLogLevel", self.pilotLogLevel) self.maxPilotsToSubmit = self.am_getOption("MaxPilotsToSubmit", self.maxPilotsToSubmit) - self.pilotWaitingFlag = self.am_getOption("PilotWaitingFlag", self.pilotWaitingFlag) self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor) - self.pilotStatusUpdateCycleFactor = self.am_getOption( - "PilotStatusUpdateCycleFactor", self.pilotStatusUpdateCycleFactor - ) - self.availableSlotsUpdateCycleFactor = self.am_getOption( - "AvailableSlotsUpdateCycleFactor", self.availableSlotsUpdateCycleFactor - ) - self.maxRetryGetPilotOutput = self.am_getOption("MaxRetryGetPilotOutput", self.maxRetryGetPilotOutput) + + # Load submission policy + self.submissionPolicyName = self.am_getOption("SubmissionPolicy", self.submissionPolicyName) + result = self._loadSubmissionPolicy() + if not result: + return result # Flags - self.addPilotsToEmptySites = self.am_getOption("AddPilotsToEmptySites", self.addPilotsToEmptySites) - self.checkPlatform = self.am_getOption("CheckPlatform", self.checkPlatform) - self.updateStatus = self.am_getOption("UpdatePilotStatus", self.updateStatus) - self.getOutput = self.am_getOption("GetPilotOutput", self.getOutput) self.sendAccounting = self.am_getOption("SendPilotAccounting", self.sendAccounting) + self.rssFlag = self.rssClient.rssFlag # Check whether to send to Monitoring or Accounting or both monitoringOption = Operations().getMonitoringBackends(monitoringType="PilotSubmissionMonitoring") @@ -188,80 +144,20 @@ def beginExecution(self): self.sendSubmissionMonitoring = True if "Accounting" in monitoringOption: self.sendSubmissionAccounting = True - # Get the site description dictionary - siteNames = None - siteNamesOption = self.am_getOption("Site", ["any"]) - if siteNamesOption and "any" not in [sn.lower() for sn in siteNamesOption]: - siteNames = siteNamesOption - - ceTypes = None - ceTypesOption = self.am_getOption("CETypes", ["any"]) - if ceTypesOption and "any" not in [ct.lower() for ct in ceTypesOption]: - ceTypes = ceTypesOption - - ces = None - cesOption = self.am_getOption("CEs", ["any"]) - if cesOption and "any" not in [ce.lower() for ce in cesOption]: - ces = cesOption + # Get the site description dictionary + siteNames = self.am_getOption("Site", []) + ceTypes = self.am_getOption("CETypes", []) + ces = self.am_getOption("CEs", []) tags = self.am_getOption("Tags", []) - if not tags: - tags = None + # Display options used self.log.always("VO:", self.vo) self.log.always("Sites:", siteNames) self.log.always("CETypes:", ceTypes) self.log.always("CEs:", ces) self.log.always("PilotDN:", self.pilotDN) self.log.always("PilotGroup:", self.pilotGroup) - - result = self.resourcesModule.getQueues( - community=self.vo, siteList=siteNames, ceList=ces, ceTypeList=ceTypes, tags=tags - ) - if not result["OK"]: - return result - result = getQueuesResolved( - siteDict=result["Value"], - queueCECache=self.queueCECache, - gridEnv=self.gridEnv, - setup=gConfig.getValue("/DIRAC/Setup", "unknown"), - workingDir=self.workingDirectory, - checkPlatform=self.checkPlatform, - instantiateCEs=True, - ) - if not result["OK"]: - return result - - self.queueDict = result["Value"] - for __queueName, queueDict in self.queueDict.items(): - # Update self.sites - if queueDict["Site"] not in self.sites: - self.sites.append(queueDict["Site"]) - - # Update self.platforms, keeping entries unique and squashing lists - self.platforms = [] - if "Platform" in queueDict["ParametersDict"]: - platform = queueDict["ParametersDict"]["Platform"] - oldPlatforms = set(self.platforms) - if isinstance(platform, list): - oldPlatforms.update(set(platform)) - else: - oldPlatforms.add(platform) - self.platforms = list(oldPlatforms) - - # Update self.globalParameters - if "WholeNode" in queueDict["ParametersDict"]: - self.globalParameters["WholeNode"] = "True" - for parameter in ["MaxRAM", "NumberOfProcessors"]: - if parameter in queueDict["ParametersDict"]: - self.globalParameters[parameter] = max( - self.globalParameters[parameter], int(queueDict["ParametersDict"][parameter]) - ) - - if self.updateStatus: - self.log.always("Pilot status update requested") - if self.getOutput: - self.log.always("Pilot output retrieval requested") if self.sendAccounting: self.log.always("Pilot accounting sending requested") if self.sendSubmissionAccounting: @@ -271,33 +167,69 @@ def beginExecution(self): self.log.always("MaxPilotsToSubmit:", self.maxPilotsToSubmit) - if self.firstPass: - if self.queueDict: - self.log.always("Agent will serve queues:") - for queue in self.queueDict: - self.log.always( - f"Site: {self.queueDict[queue]['Site']}, CE: {self.queueDict[queue]['CEName']}, Queue: {queue}" - ) - self.firstPass = False + # Build the dictionary of queues that are going to be used: self.queueDict + result = self._buildQueueDict(siteNames, ceTypes, ces, tags) + if not result: + return result + + # Stop the execution if there is no usable queue + if not self.queueDict: + self.log.error("No usable queue, exiting the cycle") + return S_ERROR("No usable queue, exiting the cycle") + + self.log.always("Agent will serve queues:") + for queue in self.queueDict: + self.log.always( + f"Site: {self.queueDict[queue]['Site']}, CE: {self.queueDict[queue]['CEName']}, Queue: {queue}" + ) return S_OK() - def execute(self): - """Main execution method (what is called at each agent cycle). + def _loadSubmissionPolicy(self): + """Load a submission policy""" + objectLoader = ObjectLoader() + result = objectLoader.loadObject( + "WorkloadManagementSystem.Utilities.SubmissionPolicy", f"{self.submissionPolicyName}Policy" + ) + if not result["OK"]: + self.log.error(f"Failed to load submission policy: {result['Message']}") + return result - It basically just calls self.submitPilots() method - """ + self.submissionPolicy = result["Value"]() + return S_OK() - if not self.queueDict: - self.log.warn("No site defined, exiting the cycle") - return S_OK() + def _buildQueueDict( + self, + siteNames: list[str] | None = None, + ces: list[str] | None = None, + ceTypes: list[str] | None = None, + tags: list[str] | None = None, + ): + """Build the queueDict dictionary containing information about the queues that will be provisioned""" + # Get details about the resources + result = getQueues(community=self.vo, siteList=siteNames, ceList=ces, ceTypeList=ceTypes, tags=tags) + if not result["OK"]: + return result + + # Set up the queue dictionary + result = getQueuesResolved( + siteDict=result["Value"], + queueCECache=self.queueCECache, + vo=self.vo, + instantiateCEs=True, + ) + if not result["OK"]: + return result + self.queueDict = result["Value"] - # get list of usable sites within this cycle - result = self.siteClient.getUsableSites() + # Get list of usable sites within this cycle + result = self.siteClient.getUsableSites(siteNames) if not result["OK"]: return result - self.siteMaskList = result.get("Value", []) + siteMaskList = result.get("Value", []) + # Get list of usable CEs + ceMaskList = [] if self.rssFlag: ceNamesList = [queue["CEName"] for queue in self.queueDict.values()] result = self.rssClient.getElementStatus(ceNamesList, "ComputingElement", vO=self.vo) @@ -305,402 +237,191 @@ def execute(self): self.log.error("Can not get the status of computing elements: ", result["Message"]) return result # Try to get CEs which have been probed and those unprobed (vO='all'). - self.ceMaskList = [ + ceMaskList = [ ceName for ceName in result["Value"] if result["Value"][ceName]["all"] in ("Active", "Degraded") ] - self.log.debug("CE list with status Active or Degraded: ", self.ceMaskList) - - result = self.submitPilots() - if not result["OK"]: - self.log.error("Errors in the job submission: ", result["Message"]) - return result - - # Every N cycles we update the pilots status - cyclesDone = self.am_getModuleParam("cyclesDone") - if self.updateStatus and cyclesDone % self.pilotStatusUpdateCycleFactor == 0: - result = self.updatePilotStatus() - if not result["OK"]: - self.log.error("Errors in updating pilot status: ", result["Message"]) - return result - - return S_OK() - - def submitPilots(self): - """Go through defined computing elements and submit pilots if necessary and possible - - :return: S_OK/S_ERROR - """ - - # First, we check if we want to submit pilots at all, and also where - submit, anySite, jobSites, testSites = self._ifAndWhereToSubmit() - if not submit: - self.log.notice("Not submitting any pilots at this cycle") - return S_OK() - # From here on we assume we are going to (try to) submit some pilots - self.log.debug("Going to try to submit some pilots") + # Filter the unusable queues + for queueName in list(self.queueDict.keys()): + site = self.queueDict[queueName]["Site"] + ce = self.queueDict[queueName]["CEName"] - self.log.verbose("Queues treated", ",".join(self.queueDict)) + # Check the status of the Site + if site in siteMaskList: + continue - self.totalSubmittedPilots = 0 + # Check the status of the CE (only for RSS=Active) + if not self.rssFlag or (self.rssFlag and ce in ceMaskList): + continue - queueDictItems = list(self.queueDict.items()) - random.shuffle(queueDictItems) + self.log.warn("Queue not considered because not usable:", queueName) + self.queueDict.pop(queueName) - for queueName, queueDictionary in queueDictItems: - # now submitting to the single queues - self.log.verbose("Evaluating queue", queueName) + return S_OK() - # are we going to submit pilots to this specific queue? - if not self._allowedToSubmit(queueName, anySite, jobSites, testSites): - continue + ##################################################################################### - if "CPUTime" in queueDictionary["ParametersDict"]: - queueCPUTime = int(queueDictionary["ParametersDict"]["CPUTime"]) - else: - self.log.warn("CPU time limit is not specified, skipping", f"queue {queueName}") - continue - if queueCPUTime > self.maxQueueLength: - queueCPUTime = self.maxQueueLength + def execute(self): + """Main execution method (what is called at each agent cycle). - ce, ceDict = self._getCE(queueName) + It basically just submits pilots and gets their status + """ + submissionResult = self.submitPilots() + monitoringResult = self.monitorPilots() - # additionalInfo is normally taskQueueDict - pilotsWeMayWantToSubmit, additionalInfo = self._getPilotsWeMayWantToSubmit(ceDict) - self.log.debug(f"{pilotsWeMayWantToSubmit} pilotsWeMayWantToSubmit are eligible for {queueName} queue") - if not pilotsWeMayWantToSubmit: - self.log.debug(f"...so skipping {queueName}") - continue + if not submissionResult["OK"]: + return submissionResult - # Get the number of already waiting pilots for the queue - totalWaitingPilots = 0 - manyWaitingPilotsFlag = False - if self.pilotWaitingFlag: - tqIDList = list(additionalInfo) - result = self.pilotAgentsDB.countPilots( - {"TaskQueueID": tqIDList, "Status": PilotStatus.PILOT_WAITING_STATES}, None - ) - if not result["OK"]: - self.log.error("Failed to get Number of Waiting pilots", result["Message"]) - totalWaitingPilots = 0 - else: - totalWaitingPilots = result["Value"] - self.log.debug(f"Waiting Pilots: {totalWaitingPilots}") - if totalWaitingPilots >= pilotsWeMayWantToSubmit: - self.log.verbose("Possibly enough pilots already waiting", f"({totalWaitingPilots})") - manyWaitingPilotsFlag = True - if not self.addPilotsToEmptySites: - continue - - self.log.debug( - f"{totalWaitingPilots} waiting pilots for the total of {pilotsWeMayWantToSubmit} eligible pilots for {queueName}" - ) + if not monitoringResult["OK"]: + return monitoringResult - # Get the number of available slots on the target site/queue - totalSlots = self.getQueueSlots(queueName, manyWaitingPilotsFlag) - if totalSlots <= 0: - self.log.debug(f"{queueName}: No slots available") - continue + return S_OK() - if manyWaitingPilotsFlag: - # Throttle submission of extra pilots to empty sites - pilotsToSubmit = int(self.maxPilotsToSubmit / 10) + 1 - else: - pilotsToSubmit = max(0, min(totalSlots, pilotsWeMayWantToSubmit - totalWaitingPilots)) - self.log.info( - f"{queueName}: Slots={totalSlots}, TQ jobs(pilotsWeMayWantToSubmit)={pilotsWeMayWantToSubmit}, Pilots: waiting {totalWaitingPilots}, to submit={pilotsToSubmit}" - ) + ##################################################################################### - # Limit the number of pilots to submit to MAX_PILOTS_TO_SUBMIT - pilotsToSubmit = min(self.maxPilotsToSubmit, pilotsToSubmit) + def submitPilots(self): + """Go through defined computing elements and submit pilots if necessary and possible""" + # Getting the status of pilots in a queue implies the use of remote CEs and may lead to network latency + # Threads aim at overcoming such issues and thus 1 thread per queue is created to submit pilots + self.log.verbose("Submission: Queues treated are", ",".join(self.queueDict)) - # Get the working proxy - cpuTime = queueCPUTime + 86400 - self.log.verbose("Getting pilot proxy", f"for {self.pilotDN}/{self.pilotGroup} {cpuTime} long") - result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, self.pilotGroup, cpuTime) - if not result["OK"]: - return result - proxy = result["Value"] - # Check returned proxy lifetime - result = proxy.getRemainingSecs() # pylint: disable=no-member - if not result["OK"]: - return result - lifetime_secs = result["Value"] - ce.setProxy(proxy, lifetime_secs) + errors = [] + with ThreadPoolExecutor(max_workers=len(self.queueDict)) as executor: + futures = [] + for queue in self.queueDict: + futures.append(executor.submit(self._submitPilotsPerQueue, queue)) - # Get valid token if needed - if self.__supportToken(ce): - result = self.__getPilotToken(audience=ce.audienceName) + for future in as_completed(futures): + result = future.result() if not result["OK"]: - return result - ce.setToken(result["Value"]) - - # now really submitting - res = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName) - if not res["OK"]: - self.log.info("Failed pilot submission", f"Queue: {queueName}") - else: - pilotList, stampDict = res["Value"] - - # updating the pilotAgentsDB... done by default but maybe not strictly necessary - self._addPilotTQReference(queueName, additionalInfo, pilotList, stampDict) + errors.append(result["Message"]) - # Summary after the cycle over queues - self.log.info("Total number of pilots submitted in this cycle", f"{self.totalSubmittedPilots}") + if errors: + self.log.error("The following errors occurred during the pilot submission operation", "\n".join(errors)) + return S_ERROR("Pilot submission: errors occurred") return S_OK() - def __supportToken(self, ce: ComputingElement) -> bool: - """Check whether the SiteDirector is able to submit pilots with tokens. + def _submitPilotsPerQueue(self, queueName: str): + """Submit pilots within a given computing elements - * the CE is able to receive any token. Validation: Tag = Token should be included in the CE parameters. - * the CE is able to receive VO-specifc tokens. Validation: Tag = Token: should be included in the CE parameters. + :return: S_OK/S_ERROR """ - return "Token" in ce.ceParameters.get("Tag", []) or f"Token:{self.vo}" in ce.ceParameters.get("Tag", []) + queueDictionary = self.queueDict[queueName] - def __getPilotToken(self, audience: str, scope: list[str] = None): - """Get the token corresponding to the pilot user identity - - :param audience: Token audience, targeting a single CE - :param scope: list of permissions needed to interact with a CE - :return: S_OK/S_ERROR, Token object as Value - """ - if not audience: - return S_ERROR("Audience is not defined") + # Are we allowed to submit pilots to this specific queue? + failedCount = self.failedQueues[queueName] % self.failedQueueCycleFactor + if failedCount != 0: + self.log.warn( + "Queue failed recently ==> number of cycles skipped", + f"{queueName} ==> {self.failedQueueCycleFactor - failedCount}", + ) + self.failedQueues[queueName] += 1 + return S_OK() - if not scope: - scope = PILOT_SCOPES + # Adjust queueCPUTime: needed to generate the proxy + if "CPUTime" not in queueDictionary["ParametersDict"]: + self.log.error("CPU time limit is not specified, skipping", f"queue {queueName}") + return S_ERROR(f"CPU time limit is not specified, skipping queue {queueName}") - return gTokenManager.getToken(userGroup=self.pilotGroup, requiredTimeLeft=600, scope=scope, audience=audience) + queueCPUTime = int(queueDictionary["ParametersDict"]["CPUTime"]) + if queueCPUTime > self.maxQueueLength: + queueCPUTime = self.maxQueueLength - def _ifAndWhereToSubmit(self): - """Return a tuple that says if and where to submit pilots: + # Get CE instance + ce = self.queueDict[queueName]["CE"] - (submit, anySite, jobSites, testSites) - e.g. - (True, False, {'Site1', 'Site2'}, {'Test1', 'Test2'}) + # Set credentials + cpuTime = queueCPUTime + 86400 + result = self._setCredentials(ce, cpuTime) + if not result["OK"]: + self.log.error("Failed to set credentials:", result["Message"]) + return result - VOs may want to replace this method with different strategies - """ + # Get the number of available slots on the target site/queue + totalSlots = self._getQueueSlots(queueName) + if totalSlots <= 0: + self.log.verbose(f"{queueName}: No slot available") + return S_ERROR(f"{queueName}: No slot available") + self.log.info(f"{queueName}: to submit={totalSlots}") - tqDict = self._getTQDictForMatching() - if not tqDict: - return True, True, set(), set() + # Apply the submission policy + totalSlots = self.submissionPolicy.apply(totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters) - # the tqDict used here is a very generic one, not specific to one CE/queue only - self.log.verbose("Checking overall TQ availability with requirements") - self.log.verbose(tqDict) + # Limit the number of pilots to submit to self.maxPilotsToSubmit + pilotsToSubmit = min(self.maxPilotsToSubmit, totalSlots) - # Check that there is some work at all - result = self.matcherClient.getMatchingTaskQueues(tqDict) + # Now really submitting + result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName) if not result["OK"]: - self.log.error("Matcher error: ", result["Message"]) - return False, True, set(), set() - matchingTQs = result["Value"] - if not matchingTQs: - self.log.notice("No Waiting jobs suitable for the director, so nothing to submit") - return False, True, set(), set() - - # If we are here there's some work to do, now let's see for where - jobSites = set() - testSites = set() - anySite = False - - for tqDescription in matchingTQs.values(): - siteList = tqDescription.get("Sites", []) - if siteList: - jobSites |= set(siteList) - else: - anySite = True - - if "JobTypes" in tqDescription: - if "Sites" in tqDescription: - for site in tqDescription["Sites"]: - if site.lower() != "any": - testSites.add(site) - - self.monitorJobsQueuesPilots(matchingTQs) + self.log.info("Failed pilot submission", f"Queue: {queueName}") + return result + pilotList, stampDict = result["Value"] - return True, anySite, jobSites, testSites + # updating the pilotAgentsDB... done by default but maybe not strictly necessary + result = self._addPilotReferences(queueName, pilotList, stampDict) + if not result["OK"]: + return result - def monitorJobsQueuesPilots(self, matchingTQs): - """Just printout of jobs queues and pilots status in TQ""" - tqIDList = list(matchingTQs) - result = self.pilotAgentsDB.countPilots( - {"TaskQueueID": tqIDList, "Status": PilotStatus.PILOT_WAITING_STATES}, None - ) + # Summary after the cycle over queues + self.log.info("Total number of pilots submitted in this cycle", f"{len(pilotList)} to {queueName}") + return S_OK() - totalWaitingJobs = 0 - for tqDescription in matchingTQs.values(): - totalWaitingJobs += tqDescription["Jobs"] + def _getQueueSlots(self, queue: str): + """Get the number of available slots in the queue""" + ce = self.queueDict[queue]["CE"] + ceName = self.queueDict[queue]["CEName"] + queueName = self.queueDict[queue]["QueueName"] - if not result["OK"]: - self.log.error("Can't count pilots", result["Message"]) - else: + # First, try to get available slots from the CE + result = ce.available() + if result["OK"]: + ceInfoDict = result["CEInfoDict"] self.log.info( - "Total jobs : number of task queues : number of waiting pilots", - f"{totalWaitingJobs} : {len(tqIDList)} : {result['Value']}", + "CE queue report", + f"({ceName}_{queueName}): Wait={ceInfoDict['WaitingJobs']}, Run={ceInfoDict['RunningJobs']}, Max={ceInfoDict['MaxTotalJobs']}", ) + return result["Value"] - def _getTQDictForMatching(self): - """Just construct a dictionary (tqDict) - that will be used to check with Matcher if there's anything to submit. - - If extensions want, they can replace partly or fully this method. - If it returns just an empty dict, the assuption is that we'll submit pilots no matters what. - - :returns dict: tqDict of task queue descriptions - """ - tqDict = {"Setup": CSGlobals.getSetup(), "CPUTime": 9999999} - tqDict["Community"] = self.vo - - if self.checkPlatform: - platforms = self._getPlatforms() - if platforms: - tqDict["Platform"] = platforms - - tqDict["Site"] = self.sites - - # Get a union of all tags - tags = [] - for queue in self.queueDict: - tags += self.queueDict[queue]["ParametersDict"].get("Tag", []) - tqDict["Tag"] = list(set(tags)) - - # Add overall max values for all queues - tqDict.update(self.globalParameters) - - return tqDict - - def _getPlatforms(self): - """Get the platforms used for TQ match - Here for extension purpose. + # If we cannot get available slots from the CE, then we get them from the pilotAgentsDB + maxWaitingJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxWaitingJobs", 10)) + maxTotalJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxTotalJobs", 10)) - :return: list of platforms - """ - result = self.resourcesModule.getCompatiblePlatforms(self.platforms) + # Get the number of transient pilots + result = self.pilotAgentsDB.countPilots( + {"DestinationSite": ceName, "Queue": queueName, "Status": PilotStatus.PILOT_TRANSIENT_STATES} + ) if not result["OK"]: - self.log.error( - "Issue getting compatible platforms, will skip check of platforms", - self.platforms + " : " + result["Message"], - ) - return result["Value"] - - def _allowedToSubmit(self, queue, anySite, jobSites, testSites): - """Check if we are allowed to submit to a certain queue - - :param str queue: the queue name - :param bool anySite: submitting anywhere? - :param set jobSites: set of job site names (only considered if anySite is False) - :param set testSites: set of test site names - - :return: True/False - """ - - # Check if the queue failed previously - failedCount = self.failedQueues[queue] % self.failedQueueCycleFactor - if failedCount != 0: - self.log.warn("queue failed recently ==> number of cycles skipped", f"{queue} ==> {10 - failedCount}") + self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}") self.failedQueues[queue] += 1 - return False - - # Check the status of the site - if self.queueDict[queue]["Site"] not in self.siteMaskList and self.queueDict[queue]["Site"] not in testSites: - self.log.verbose( - "Queue skipped (site not in mask)", - f"{self.queueDict[queue]['QueueName']} ({self.queueDict[queue]['Site']})", - ) - return False - - # Check that there are task queues waiting for this site - if not anySite and self.queueDict[queue]["Site"] not in jobSites: - self.log.verbose( - "Queue skipped: no workload expected", - f"{self.queueDict[queue]['CEName']} at {self.queueDict[queue]['Site']}", - ) - return False - - # Check the status of the CE (only for RSS=Active) - if self.rssFlag: - if self.queueDict[queue]["CEName"] not in self.ceMaskList: - self.log.verbose( - "Skipping computing element: resource not usable", - f"{self.queueDict[queue]['CEName']} at {self.queueDict[queue]['Site']}", - ) - return False - - # if we are here, it means that we are allowed to submit to the queue - return True - - def _getCE(self, queue): - """Prepare the queue description to look for eligible jobs - - :param str queue: queue name - - :return: ce (ComputingElement object), ceDict (dict) - """ - - ce = self.queueDict[queue]["CE"] - ceDict = ce.ceParameters - ceDict["GridCE"] = self.queueDict[queue]["CEName"] - - if self.queueDict[queue]["Site"] not in self.siteMaskList: - ceDict["JobType"] = "Test" - ceDict["Community"] = self.vo - - if self.checkPlatform: - platform = self.queueDict[queue]["ParametersDict"].get("Platform") - if not platform: - self.log.error(f"No platform set for CE {ce}, returning 'ANY'") - ceDict["Platform"] = "ANY" - return ce, ceDict - result = self.resourcesModule.getCompatiblePlatforms(platform) - if result["OK"]: - ceDict["Platform"] = result["Value"] - else: - self.log.error( - "Issue getting compatible platforms, returning 'ANY'", f"{self.platforms}: {result['Message']}" - ) - ceDict["Platform"] = "ANY" - - return ce, ceDict - - def _getPilotsWeMayWantToSubmit(self, ceDict): - """Returns the number of pilots that we may want to submit to the ce described in ceDict - - This implementation is based on the number of eligible WMS taskQueues for the target site/queue. - VOs are free to override this method and to provide a different implementation. - - :param ceDict: dictionary describing CE - :type ceDict: dict - - :return: pilotsWeMayWantToSubmit (int), taskQueueDict (dict) - :rType: tuple - """ - - pilotsWeMayWantToSubmit = 0 + return 0 + totalJobs = result["Value"] - result = self.matcherClient.getMatchingTaskQueues(ceDict) + # Get the number of waiting pilots + result = self.pilotAgentsDB.countPilots( + {"DestinationSite": ceName, "Queue": queueName, "Status": PilotStatus.PILOT_WAITING_STATES} + ) if not result["OK"]: - self.log.error("Could not retrieve TaskQueues from TaskQueueDB", result["Message"]) - return 0, {} - taskQueueDict = result["Value"] - if not taskQueueDict: - self.log.verbose("No matching TQs found", f"for {ceDict}") + self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}") + self.failedQueues[queue] += 1 + return 0 + waitingJobs = result["Value"] - for tq in taskQueueDict.values(): - pilotsWeMayWantToSubmit += tq["Jobs"] + runningJobs = totalJobs - waitingJobs + self.log.info( + "PilotAgentsDB report", + f"({ceName}_{queueName}): Wait={waitingJobs}, Run={runningJobs}, Max={maxTotalJobs}", + ) - return pilotsWeMayWantToSubmit, taskQueueDict + totalSlots = min((maxTotalJobs - totalJobs), (maxWaitingJobs - waitingJobs)) + return totalSlots - def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): + def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: str): """Method that really submits the pilots to the ComputingElements' queue :param pilotsToSubmit: number of pilots to submit. - :type pilotsToSubmit: int :param ce: computing element object to where we submit - :type ce: ComputingElement - :param str queue: queue where to submit + :param queue: queue where to submit :return: S_OK/S_ERROR. If S_OK, returns tuple with (pilotList, stampDict) @@ -711,30 +432,35 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): """ self.log.info("Going to submit pilots", f"(a maximum of {pilotsToSubmit} pilots to {queue} queue)") + # Get parameters to generate the pilot executable bundleProxy = self.queueDict[queue].get("BundleProxy", False) proxy = None if bundleProxy: proxy = ce.proxy - jobExecDir = self.queueDict[queue]["ParametersDict"].get("JobExecDir", "") envVariables = self.queueDict[queue]["ParametersDict"].get("EnvironmentVariables", None) - executable = self.getExecutable(queue, proxy=proxy, jobExecDir=jobExecDir, envVariables=envVariables) + # Generate the executable + executable = self._getExecutable(queue, proxy=proxy, jobExecDir=jobExecDir, envVariables=envVariables) + # Submit the job submitResult = ce.submitJob(executable, "", pilotsToSubmit) # In case the CE does not need the executable after the submission, we delete it # Else, we keep it, the CE will delete it after the end of the pilot execution if submitResult.get("ExecutableToKeep") != executable: os.unlink(executable) + siteName = self.queueDict[queue]["Site"] + ceName = self.queueDict[queue]["CEName"] + queueName = self.queueDict[queue]["QueueName"] if not submitResult["OK"]: self.log.error("Failed submission to queue", f"Queue {queue}:\n{submitResult['Message']}") if self.sendSubmissionAccounting: - result = self.sendPilotSubmissionAccounting( - self.queueDict[queue]["Site"], - self.queueDict[queue]["CEName"], - self.queueDict[queue]["QueueName"], + result = self._sendPilotSubmissionAccounting( + siteName, + ceName, + queueName, pilotsToSubmit, 0, "Failed", @@ -743,10 +469,10 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): self.log.error("Failure submitting Accounting report", result["Message"]) if self.sendSubmissionMonitoring: - result = self.sendPilotSubmissionMonitoring( - self.queueDict[queue]["Site"], - self.queueDict[queue]["CEName"], - self.queueDict[queue]["QueueName"], + result = self._sendPilotSubmissionMonitoring( + siteName, + ceName, + queueName, pilotsToSubmit, 0, "Failed", @@ -757,20 +483,18 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): self.failedQueues[queue] += 1 return submitResult - # Add pilots to the PilotAgentsDB: assign pilots to TaskQueue proportionally to the task queue priorities + # Add pilots to the PilotAgentsDB pilotList = submitResult["Value"] - self.queueSlots[queue]["AvailableSlots"] -= len(pilotList) - self.totalSubmittedPilots += len(pilotList) self.log.info( f"Submitted {len(pilotList)} pilots to {self.queueDict[queue]['QueueName']}@{self.queueDict[queue]['CEName']}" ) stampDict = submitResult.get("PilotStampDict", {}) if self.sendSubmissionAccounting: - result = self.sendPilotSubmissionAccounting( - self.queueDict[queue]["Site"], - self.queueDict[queue]["CEName"], - self.queueDict[queue]["QueueName"], + result = self._sendPilotSubmissionAccounting( + siteName, + ceName, + queueName, len(pilotList), len(pilotList), "Succeeded", @@ -779,10 +503,10 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): self.log.error("Failure submitting Accounting report", result["Message"]) if self.sendSubmissionMonitoring: - result = self.sendPilotSubmissionMonitoring( - self.queueDict[queue]["Site"], - self.queueDict[queue]["CEName"], - self.queueDict[queue]["QueueName"], + result = self._sendPilotSubmissionMonitoring( + siteName, + ceName, + queueName, len(pilotList), len(pilotList), "Succeeded", @@ -792,156 +516,50 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): return S_OK((pilotList, stampDict)) - def _addPilotTQReference(self, queue, taskQueueDict, pilotList, stampDict): - """Add to pilotAgentsDB the reference of for which TqID the pilots have been sent + def _addPilotReferences(self, queue: str, pilotList: list[str], stampDict: dict[str, str]): + """Add pilotReference to pilotAgentsDB - :param str queue: the queue name - :param taskQueueDict: dict of task queues - :type taskQueueDict: dict + :param queue: the queue name :param pilotList: list of pilots - :type pilotList: list :param stampDict: dictionary of pilots timestamps - :type stampDict: dict - - :return: None """ + result = self.pilotAgentsDB.addPilotReferences( + pilotList, + self.pilotGroup, + self.queueDict[queue]["CEType"], + stampDict, + ) + if not result["OK"]: + self.log.error("Failed add pilots to the PilotAgentsDB", result["Message"]) + return result - tqPriorityList = [] - sumPriority = 0.0 - for tq in taskQueueDict: - sumPriority += taskQueueDict[tq]["Priority"] - tqPriorityList.append((tq, sumPriority)) - tqDict = {} - for pilotID in pilotList: - rndm = random.random() * sumPriority - for tq, prio in tqPriorityList: - if rndm < prio: - tqID = tq - break - if tqID not in tqDict: - tqDict[tqID] = [] - tqDict[tqID].append(pilotID) - - for tqID, pilotsList in tqDict.items(): - result = self.pilotAgentsDB.addPilotTQReference( - pilotsList, - tqID, - self.pilotGroup, - self.queueDict[queue]["CEType"], - stampDict, + for pilot in pilotList: + result = self.pilotAgentsDB.setPilotStatus( + pilot, + PilotStatus.SUBMITTED, + self.queueDict[queue]["CEName"], + "Successfully submitted by the SiteDirector", + self.queueDict[queue]["Site"], + self.queueDict[queue]["QueueName"], ) if not result["OK"]: - self.log.error("Failed add pilots to the PilotAgentsDB", result["Message"]) - continue - for pilot in pilotsList: - result = self.pilotAgentsDB.setPilotStatus( - pilot, - PilotStatus.SUBMITTED, - self.queueDict[queue]["CEName"], - "Successfully submitted by the SiteDirector", - self.queueDict[queue]["Site"], - self.queueDict[queue]["QueueName"], - ) - if not result["OK"]: - self.log.error("Failed to set pilot status", result["Message"]) - continue - - def getQueueSlots(self, queue, manyWaitingPilotsFlag): - """Get the number of available slots in the queue""" - ce = self.queueDict[queue]["CE"] - ceName = self.queueDict[queue]["CEName"] - queueName = self.queueDict[queue]["QueueName"] - queryCEFlag = self.queueDict[queue]["QueryCEFlag"].lower() in ["1", "yes", "true"] - - self.queueSlots.setdefault(queue, {}) - totalSlots = self.queueSlots[queue].get("AvailableSlots", 0) - - # See if there are waiting pilots for this queue. If not, allow submission - if totalSlots and manyWaitingPilotsFlag: - result = self.pilotAgentsDB.selectPilots( - {"DestinationSite": ceName, "Queue": queueName, "Status": PilotStatus.PILOT_WAITING_STATES} - ) - if result["OK"]: - jobIDList = result["Value"] - if not jobIDList: - return totalSlots - return 0 - - availableSlotsCount = self.queueSlots[queue].setdefault("AvailableSlotsCount", 0) - waitingJobs = 1 - if totalSlots == 0: - if availableSlotsCount % self.availableSlotsUpdateCycleFactor == 0: - # Get the list of already existing pilots for this queue - jobIDList = None - result = self.pilotAgentsDB.selectPilots( - {"DestinationSite": ceName, "Queue": queueName, "Status": PilotStatus.PILOT_TRANSIENT_STATES} - ) - - if result["OK"]: - jobIDList = result["Value"] - - if queryCEFlag: - result = ce.available(jobIDList) - if not result["OK"]: - self.log.warn("Failed to check the availability of queue", f"{queue}: \n{result['Message']}") - self.failedQueues[queue] += 1 - else: - ceInfoDict = result["CEInfoDict"] - self.log.info( - "CE queue report", - f"({ceName}_{queueName}): Wait={ceInfoDict['WaitingJobs']}, Run={ceInfoDict['RunningJobs']}, Submitted={ceInfoDict['SubmittedJobs']}, Max={ceInfoDict['MaxTotalJobs']}", - ) - totalSlots = result["Value"] - self.queueSlots[queue]["AvailableSlots"] = totalSlots - waitingJobs = ceInfoDict["WaitingJobs"] - else: - maxWaitingJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxWaitingJobs", 10)) - maxTotalJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxTotalJobs", 10)) - waitingToRunningRatio = float( - self.queueDict[queue]["ParametersDict"].get("WaitingToRunningRatio", 0.0) - ) - waitingJobs = 0 - totalJobs = 0 - if jobIDList: - result = self.pilotAgentsDB.getPilotInfo(jobIDList) - if not result["OK"]: - self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}") - self.failedQueues[queue] += 1 - else: - for _pilotRef, pilotDict in result["Value"].items(): - if pilotDict["Status"] in PilotStatus.PILOT_TRANSIENT_STATES: - totalJobs += 1 - if pilotDict["Status"] in PilotStatus.PILOT_WAITING_STATES: - waitingJobs += 1 - runningJobs = totalJobs - waitingJobs - self.log.info( - "PilotAgentsDB report", - f"({ceName}_{queueName}): Wait={waitingJobs}, Run={runningJobs}, Max={maxTotalJobs}", - ) - maxWaitingJobs = int(max(maxWaitingJobs, runningJobs * waitingToRunningRatio)) - - totalSlots = min((maxTotalJobs - totalJobs), (maxWaitingJobs - waitingJobs)) - self.queueSlots[queue]["AvailableSlots"] = max(totalSlots, 0) - - self.queueSlots[queue]["AvailableSlotsCount"] += 1 - - if manyWaitingPilotsFlag and waitingJobs: - return 0 - return totalSlots + self.log.error("Failed to set pilot status", result["Message"]) + return result + return S_OK() - ##################################################################################### - def getExecutable(self, queue, proxy=None, jobExecDir="", envVariables=None, **kwargs): + def _getExecutable( + self, queue: str, proxy: X509Chain = None, jobExecDir: str = "", envVariables: dict[str, str] = None + ): """Prepare the full executable for queue - :param str queue: queue name - :param bool proxy: flag that say if to bundle or not the proxy - :param str jobExecDir: pilot execution dir (normally an empty string) + :param queue: queue name + :param proxy: flag that say if to bundle or not the proxy + :param jobExecDir: pilot execution dir (normally an empty string) :returns: a string the options for the pilot - :rtype: str """ - pilotOptions = self._getPilotOptions(queue, **kwargs) + pilotOptions = self._getPilotOptions(queue) if not pilotOptions: self.log.warn("Pilots will be submitted without additional options") pilotOptions = [] @@ -966,15 +584,12 @@ def getExecutable(self, queue, proxy=None, jobExecDir="", envVariables=None, **k ) return executable - ##################################################################################### - - def _getPilotOptions(self, queue, **kwargs): + def _getPilotOptions(self, queue: str) -> list[str]: """Prepare pilot options - :param str queue: queue name + :param queue: queue name :returns: pilotOptions is a list of strings, each one is an option to the dirac-pilot script invocation - :rtype: list """ queueDict = self.queueDict[queue]["ParametersDict"] pilotOptions = [] @@ -1047,18 +662,22 @@ def _getPilotOptions(self, queue, **kwargs): return pilotOptions - #################################################################################### - - def _writePilotScript(self, workingDirectory, pilotOptions, proxy=None, pilotExecDir="", envVariables=None): + def _writePilotScript( + self, + workingDirectory: str, + pilotOptions: str, + proxy: X509Chain = None, + pilotExecDir: str = "", + envVariables: dict[str, str] = None, + ): """Bundle together and write out the pilot executable script, admix the proxy if given - :param str workingDirectory: pilot wrapper working directory - :param str pilotOptions: options with which to start the pilot - :param str proxy: proxy file we are going to bundle - :param str pilotExecDir: pilot executing directory + :param workingDirectory: pilot wrapper working directory + :param pilotOptions: options with which to start the pilot + :param proxy: proxy file we are going to bundle + :param pilotExecDir: pilot executing directory :returns: file name of the pilot wrapper created - :rtype: str """ try: @@ -1077,106 +696,36 @@ def _writePilotScript(self, workingDirectory, pilotOptions, proxy=None, pilotExe return _writePilotWrapperFile(workingDirectory=workingDirectory, localPilot=localPilot) - def updatePilotStatus(self): - """Update status of pilots in transient and final states""" + ##################################################################################### - # Generate a proxy before feeding the threads to renew the ones of the CEs to perform actions - result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, self.pilotGroup, 23400) - if not result["OK"]: - return result - proxy = result["Value"] + def monitorPilots(self): + """Update status of pilots in transient and final states""" + self.log.verbose("Monitoring: Queues treated are", ",".join(self.queueDict)) # Getting the status of pilots in a queue implies the use of remote CEs and may lead to network latency # Threads aim at overcoming such issues and thus 1 thread per queue is created to # update the status of pilots in transient states + errors = [] with ThreadPoolExecutor(max_workers=len(self.queueDict)) as executor: futures = [] for queue in self.queueDict: - futures.append(executor.submit(self._updatePilotStatusPerQueue, queue, proxy)) - for res in as_completed(futures): - err = res.exception() - if err: - self.log.exception("Update pilot status thread failed", lException=err) - - # The pilot can be in Done state set by the job agent check if the output is retrieved - for queue in self.queueDict: - ce = self.queueDict[queue]["CE"] + futures.append(executor.submit(self._monitorPilotsPerQueue, queue)) - if not ce.isProxyValid(120)["OK"]: - result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, self.pilotGroup, 1000) + for future in as_completed(futures): + result = future.result() if not result["OK"]: - return result - proxy = result["Value"] - ce.setProxy(proxy, 940) - - if callable(getattr(ce, "cleanupPilots", None)): - ce.cleanupPilots() - - ceName = self.queueDict[queue]["CEName"] - queueName = self.queueDict[queue]["QueueName"] - ceType = self.queueDict[queue]["CEType"] - siteName = self.queueDict[queue]["Site"] - result = self.pilotAgentsDB.selectPilots( - { - "DestinationSite": ceName, - "Queue": queueName, - "GridType": ceType, - "GridSite": siteName, - "OutputReady": "False", - "Status": PilotStatus.PILOT_FINAL_STATES, - } - ) - - if not result["OK"]: - self.log.error("Failed to select pilots", result["Message"]) - continue - pilotRefs = result["Value"] - if not pilotRefs: - continue - result = self.pilotAgentsDB.getPilotInfo(pilotRefs) - if not result["OK"]: - self.log.error("Failed to get pilots info from DB", result["Message"]) - continue - pilotDict = result["Value"] - if self.getOutput: - for pRef in pilotRefs: - self._getPilotOutput(pRef, pilotDict, ce, ceName) - - # Check if the accounting is to be sent - if self.sendAccounting: - result = self.pilotAgentsDB.selectPilots( - { - "DestinationSite": ceName, - "Queue": queueName, - "GridType": ceType, - "GridSite": siteName, - "AccountingSent": "False", - "Status": PilotStatus.PILOT_FINAL_STATES, - } - ) + errors.append(result["Message"]) - if not result["OK"]: - self.log.error("Failed to select pilots", result["Message"]) - continue - pilotRefs = result["Value"] - if not pilotRefs: - continue - result = self.pilotAgentsDB.getPilotInfo(pilotRefs) - if not result["OK"]: - self.log.error("Failed to get pilots info from DB", result["Message"]) - continue - pilotDict = result["Value"] - result = self.sendPilotAccounting(pilotDict) - if not result["OK"]: - self.log.error("Failed to send pilot agent accounting") + if errors: + self.log.error("The following errors occurred during the pilot monitoring operation", "\n".join(errors)) + return S_ERROR("Pilot monitoring: errors occurred") return S_OK() - def _updatePilotStatusPerQueue(self, queue, proxy): + def _monitorPilotsPerQueue(self, queue: str): """Update status of pilots in transient state for a given queue :param queue: queue name - :param proxy: proxy to check the pilot status and renewals """ ce = self.queueDict[queue]["CE"] ceName = self.queueDict[queue]["CEName"] @@ -1184,6 +733,7 @@ def _updatePilotStatusPerQueue(self, queue, proxy): ceType = self.queueDict[queue]["CEType"] siteName = self.queueDict[queue]["Site"] + # Select pilots in a transient states result = self.pilotAgentsDB.selectPilots( { "DestinationSite": ceName, @@ -1196,134 +746,188 @@ def _updatePilotStatusPerQueue(self, queue, proxy): ) if not result["OK"]: self.log.error("Failed to select pilots", f": {result['Message']}") - return + return result pilotRefs = result["Value"] if not pilotRefs: - return + return S_OK() + # Get their information result = self.pilotAgentsDB.getPilotInfo(pilotRefs) if not result["OK"]: self.log.error("Failed to get pilots info from DB", result["Message"]) - return + return result pilotDict = result["Value"] - stampedPilotRefs = [] - for pRef in pilotDict: - if pilotDict[pRef]["PilotStamp"]: - stampedPilotRefs.append(pRef + ":::" + pilotDict[pRef]["PilotStamp"]) - else: - stampedPilotRefs = list(pilotRefs) - break - - # This proxy is used for checking the pilot status and renewals + # The proxy is used for checking the pilot status and renewals # We really need at least a few hours otherwise the renewed # proxy may expire before we check again... - result = ce.isProxyValid(3 * 3600) + result = self._setCredentials(ce, 3 * 3600) if not result["OK"]: - ce.setProxy(proxy, 23300) - - # Get valid token if needed - if self.__supportToken(ce): - result = self.__getPilotToken(audience=ce.audienceName) - if not result["OK"]: - self.log.error("Failed to get token", f"{ceName}: {result['Message']}") - return - ce.setToken(result["Value"]) + self.log.error("Failed to set credentials:", result["Message"]) + return result - result = ce.getJobStatus(stampedPilotRefs) + # Get an update of the pilot by interrogating the CEs + result = ce.getJobStatus(pilotRefs) if not result["OK"]: self.log.error("Failed to get pilots status from CE", f"{ceName}: {result['Message']}") - return + return result pilotCEDict = result["Value"] - abortedPilots, getPilotOutput = self._updatePilotStatus(pilotRefs, pilotDict, pilotCEDict) - for pRef in getPilotOutput: - self._getPilotOutput(pRef, pilotDict, ce, ceName) - + # Get updated pilots + updatedPilots = self._getUpdatedPilotStatus(pilotDict, pilotCEDict) # If something wrong in the queue, make a pause for the job submission + abortedPilots = self._getAbortedPilots(updatedPilots) if abortedPilots: self.failedQueues[queue] += 1 + # Update the status of the pilots in the DB + self._updatePilotsInDB(updatedPilots) - def _updatePilotStatus(self, pilotRefs, pilotDict, pilotCEDict): - """Really updates the pilots status + # FIXME: seems like it is only used by the CloudCE? Couldn't it be called from CloudCE.getJobStatus()? + if callable(getattr(ce, "cleanupPilots", None)): + ce.cleanupPilots() - :return: number of aborted pilots, flag for getting the pilot output - """ + # Check if the accounting is to be sent + if self.sendAccounting: + result = self.pilotAgentsDB.selectPilots( + { + "DestinationSite": ceName, + "Queue": queueName, + "GridType": ceType, + "GridSite": siteName, + "AccountingSent": "False", + "Status": PilotStatus.PILOT_FINAL_STATES, + } + ) + if not result["OK"]: + self.log.error("Failed to select pilots", result["Message"]) + return result + + pilotRefs = result["Value"] + if not pilotRefs: + return S_OK() - abortedPilots = 0 - getPilotOutput = [] + result = self.pilotAgentsDB.getPilotInfo(pilotRefs) + if not result["OK"]: + self.log.error("Failed to get pilots info from DB", result["Message"]) + return result - for pRef in pilotRefs: - newStatus = "" - oldStatus = pilotDict[pRef]["Status"] - lastUpdateTime = pilotDict[pRef]["LastUpdateTime"] - sinceLastUpdate = datetime.datetime.utcnow() - lastUpdateTime + pilotDict = result["Value"] + result = self._sendPilotAccounting(pilotDict) + if not result["OK"]: + self.log.error("Failed to send pilot agent accounting") + return result - ceStatus = pilotCEDict.get(pRef, oldStatus) + return S_OK() - if oldStatus == ceStatus and ceStatus != PilotStatus.UNKNOWN: - # Normal status did not change, continue + def _getUpdatedPilotStatus(self, pilotDict: dict[str, Any], pilotCEDict: dict[str, Any]) -> dict[str, str]: + """Get the updated pilots, from a list of pilots, and their new status""" + updatedPilots = {} + for pilotReference, pilotInfo in pilotDict.items(): + oldStatus = pilotInfo["Status"] + sinceLastUpdate = datetime.datetime.utcnow() - pilotInfo["LastUpdateTime"] + ceStatus = pilotCEDict.get(pilotReference, oldStatus) + + if oldStatus != ceStatus: + # Normal case: update the pilot status to the new value + updatedPilots[pilotReference] = ceStatus continue - if ceStatus == oldStatus == PilotStatus.UNKNOWN: - if sinceLastUpdate < 3600 * second: - # Allow 1 hour of Unknown status assuming temporary problems on the CE - continue - newStatus = PilotStatus.ABORTED - elif ceStatus == PilotStatus.UNKNOWN and oldStatus not in PilotStatus.PILOT_FINAL_STATES: - # Possible problems on the CE, let's keep the Unknown status for a while - newStatus = PilotStatus.UNKNOWN - elif ceStatus != PilotStatus.UNKNOWN: - # Update the pilot status to the new value - newStatus = ceStatus - - if newStatus: - self.log.info("Updating status", f"to {newStatus} for pilot {pRef}") - result = self.pilotAgentsDB.setPilotStatus(pRef, newStatus, "", "Updated by SiteDirector") - if not result["OK"]: - self.log.error(result["Message"]) - if newStatus == "Aborted": - abortedPilots += 1 - # Set the flag to retrieve the pilot output now or not - if newStatus in PilotStatus.PILOT_FINAL_STATES: - if pilotDict[pRef]["OutputReady"].lower() == "false" and self.getOutput: - getPilotOutput.append(pRef) - - return abortedPilots, getPilotOutput - - def _getPilotOutput(self, pRef, pilotDict, ce, ceName): - """Retrieves the pilot output for a pilot and stores it in the pilotAgentsDB""" - self.log.info(f"Retrieving output for pilot {pRef}") - output = None - error = None - - pilotStamp = pilotDict[pRef]["PilotStamp"] - pRefStamp = pRef - if pilotStamp: - pRefStamp = pRef + ":::" + pilotStamp - - result = ce.getJobOutput(pRefStamp) - if not result["OK"]: - self.failedPilotOutput[pRefStamp] += 1 - self.log.error("Failed to get pilot output", f"{ceName}: {result['Message']}") - self.log.verbose(f"Retries left: {max(0, self.maxRetryGetPilotOutput - self.failedPilotOutput[pRefStamp])}") - - if (self.maxRetryGetPilotOutput - self.failedPilotOutput[pRefStamp]) <= 0: - output = "Output is no longer available" - error = "Error is no longer available" - self.failedPilotOutput.pop(pRefStamp) - else: - return - else: - output, error = result["Value"] - if output: - result = self.pilotAgentsDB.storePilotOutput(pRef, output, error) + if oldStatus == ceStatus and ceStatus == PilotStatus.UNKNOWN and sinceLastUpdate > 3600 * second: + # Pilots are not reachable, we mark them as aborted + updatedPilots[pilotReference] = PilotStatus.ABORTED + continue + + return updatedPilots + + def _getAbortedPilots(self, pilotsDict: dict[str, str]) -> list[str]: + """Get aborted pilots from a list of pilots and their status""" + abortedPilots = [] + for pilotReference, status in pilotsDict.items(): + if status == PilotStatus.ABORTED: + abortedPilots.append(pilotReference) + + return abortedPilots + + def _updatePilotsInDB(self, updatedPilotsDict: dict[str, str]): + """Update the status of the pilots in the DB""" + for pilotReference, newStatus in updatedPilotsDict.items(): + self.log.info("Updating status", f"to {newStatus} for pilot {pilotReference}") + result = self.pilotAgentsDB.setPilotStatus(pilotReference, newStatus, "", "Updated by SiteDirector") if not result["OK"]: - self.log.error("Failed to store pilot output", result["Message"]) - else: - self.log.warn("Empty pilot output not stored to PilotDB") + self.log.error(result["Message"]) + + ##################################################################################### + + def __getPilotToken(self, audience: str, scope: list[str] = None): + """Get the token corresponding to the pilot user identity - def sendPilotAccounting(self, pilotDict): + :param audience: Token audience, targeting a single CE + :param scope: list of permissions needed to interact with a CE + :return: S_OK/S_ERROR, Token object as Value + """ + if not audience: + return S_ERROR("Audience is not defined") + + if not scope: + scope = PILOT_SCOPES + + return gTokenManager.getToken(userGroup=self.pilotGroup, requiredTimeLeft=600, scope=scope, audience=audience) + + def __supportToken(self, ce: ComputingElement) -> bool: + """Check whether the SiteDirector is able to submit pilots with tokens. + + * the CE is able to receive any token. Validation: Tag = Token should be included in the CE parameters. + * the CE is able to receive VO-specifc tokens. Validation: Tag = Token: should be included in the CE parameters. + """ + return "Token" in ce.ceParameters.get("Tag", []) or f"Token:{self.vo}" in ce.ceParameters.get("Tag", []) + + def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: int): + """ + + :param ce: ComputingElement instance + :param proxyMinimumRequiredValidity: number of seconds needed to perform an operation with the proxy + :param tokenMinimumRequiredValidity: number of seconds needed to perform an operation with the token + """ + getNewProxy = False + + # If the CE does not already embed a proxy, we need one + if not ce.proxy: + getNewProxy = True + + # If the CE embeds a proxy that is too short to perform a given operation, we need a new one + if ce.proxy: + result = ce.proxy.getRemainingSecs() + if not result["OK"]: + return result + + if result["Value"] < proxyMinimumRequiredValidity: + getNewProxy = True + + # Generate a new proxy if needed + if getNewProxy: + self.log.verbose( + "Getting pilot proxy", f"for {self.pilotDN}/{self.pilotGroup} {proxyMinimumRequiredValidity} long" + ) + result = gProxyManager.getPilotProxyFromDIRACGroup( + self.pilotDN, self.pilotGroup, proxyMinimumRequiredValidity + ) + if not result["OK"]: + return result + ce.setProxy(result["Value"]) + + # Get valid token if needed + if self.__supportToken(ce): + result = self.__getPilotToken(audience=ce.audienceName) + if not result["OK"]: + self.log.error("Failed to get token", f"{ce.ceName}: {result['Message']}") + return + ce.setToken(result["Value"]) + + return S_OK() + + ##################################################################################### + + def _sendPilotAccounting(self, pilotDict: dict[str, Any]): """Send pilot accounting record""" for pRef in pilotDict: self.log.verbose("Preparing accounting record", f"for pilot {pRef}") @@ -1374,15 +978,17 @@ def sendPilotAccounting(self, pilotDict): return S_OK() - def sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, numSucceeded, status): + def _sendPilotSubmissionAccounting( + self, siteName: str, ceName: str, queueName: str, numTotal: int, numSucceeded: int, status: str + ): """Send pilot submission accounting record - :param str siteName: Site name - :param str ceName: CE name - :param str queueName: queue Name - :param int numTotal: Total number of submission - :param int numSucceeded: Total number of submission succeeded - :param str status: 'Succeeded' or 'Failed' + :param siteName: Site name + :param ceName: CE name + :param queueName: queue Name + :param numTotal: Total number of submission + :param numSucceeded: Total number of submission succeeded + :param status: 'Succeeded' or 'Failed' :returns: S_OK / S_ERROR """ @@ -1391,11 +997,7 @@ def sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, n pA.setStartTime(datetime.datetime.utcnow()) pA.setEndTime(datetime.datetime.utcnow()) pA.setValueByKey("HostName", DIRAC.siteName()) - if hasattr(self, "_AgentModule__moduleProperties"): - pA.setValueByKey("SiteDirector", self.am_getModuleParam("agentName")) - else: # In case it is not executed as agent - pA.setValueByKey("SiteDirector", "Client") - + pA.setValueByKey("SiteDirector", self.am_getModuleParam("agentName")) pA.setValueByKey("Site", siteName) pA.setValueByKey("CE", ceName) pA.setValueByKey("Queue", ceName + ":" + queueName) @@ -1415,29 +1017,26 @@ def sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, n return result return S_OK() - def sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal, numSucceeded, status): + def _sendPilotSubmissionMonitoring( + self, siteName: str, ceName: str, queueName: str, numTotal: int, numSucceeded: int, status: str + ): """Sends pilot submission records to monitoring - :param str siteName: Site name - :param str ceName: CE name - :param str queueName: queue Name - :param int numTotal: Total number of submission - :param int numSucceeded: Total number of submission succeeded - :param str status: 'Succeeded' or 'Failed' + :param siteName: Site name + :param ceName: CE name + :param queueName: queue Name + :param numTotal: Total number of submission + :param numSucceeded: Total number of submission succeeded + :param status: 'Succeeded' or 'Failed' :returns: S_OK / S_ERROR """ pilotMonitoringReporter = MonitoringReporter(monitoringType="PilotSubmissionMonitoring") - if hasattr(self, "_AgentModule__moduleProperties"): - siteDirName = self.am_getModuleParam("agentName") - else: # In case it is not executed as agent - siteDirName = "Client" - pilotMonitoringData = { "HostName": DIRAC.siteName(), - "SiteDirector": siteDirName, + "SiteDirector": self.am_getModuleParam("agentName"), "Site": siteName, "CE": ceName, "Queue": ceName + ":" + queueName, diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py index 0d842dc1ea8..96582bab4f5 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py @@ -43,7 +43,7 @@ class StatesAccountingAgent(AgentModule): __renameFieldsMapping = {"JobType": "JobSplitType"} # PilotsHistory fields - __pilotsMapping = ["TaskQueueID", "GridSite", "GridType", "Status", "NumOfPilots"] + __pilotsMapping = ["GridSite", "GridType", "Status", "NumOfPilots"] def initialize(self): """Standard initialization""" diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py index ad343c7e89a..9d00e622243 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py @@ -2,214 +2,271 @@ """ # pylint: disable=protected-access -# imports import datetime +import os import pytest -from unittest.mock import MagicMock +from diraccfg import CFG -from DIRAC import gLogger +from DIRAC import gLogger, gConfig +from DIRAC.ConfigurationSystem.Client import ConfigurationData +from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus -# sut from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import SiteDirector +from DIRAC.WorkloadManagementSystem.Client import PilotStatus +from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import SUBMISSION_POLICIES -mockAM = MagicMock() -mockGCReply = MagicMock() -mockGCReply.return_value = "TestSetup" -mockOPSObject = MagicMock() -mockOPSObject.getValue.return_value = "123" -mockOPSReply = MagicMock() -mockOPSReply.return_value = "123" -mockOPS = MagicMock() -mockOPS.return_value = mockOPSObject -# mockOPS.Operations = mockOPSObject -mockPM = MagicMock() -mockPM.requestToken.return_value = {"OK": True, "Value": ("token", 1)} -mockPMReply = MagicMock() -mockPMReply.return_value = {"OK": True, "Value": ("token", 1)} - -mockCSGlobalReply = MagicMock() -mockCSGlobalReply.return_value = "TestSetup" -mockResourcesReply = MagicMock() -mockResourcesReply.return_value = {"OK": True, "Value": ["x86_64-slc6", "x86_64-slc5"]} +CONFIG = """ +Resources +{ + Sites + { + LCG + { + LCG.Site1.com + { + VO = dteam + CEs + { + ce1.site1.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + Tag = Token + } + ce2.site1.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + } + } + } + LCG.Site2.site2 + { + CEs + { + ce1.site2.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + Tag = Token + } + } + } + } + DIRAC + { + DIRAC.Site3.site3 + { + CEs + { + ce1.site3.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + Tag = Token + } + } + } + } + } +} +""" -mockPilotAgentsDB = MagicMock() -mockPilotAgentsDB.setPilotStatus.return_value = {"OK": True} -gLogger.setLevel("DEBUG") +@pytest.fixture +def config(): + """Load a fake configuration""" + ConfigurationData.localCFG = CFG() + cfg = CFG() + cfg.loadFromBuffer(CONFIG) + gConfig.loadCFG(cfg) @pytest.fixture -def sd(mocker): - """mocker for SiteDirector""" +def sd(mocker, config): + """Basic configuration of a SiteDirector. It tests the _buildQueueDict() method at the same time""" mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.AgentModule.__init__") - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gConfig.getValue", side_effect=mockGCReply) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.Operations", side_effect=mockOPS) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.Operations.getValue", return_value="123") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.getPilotAgentsDB") + + usableSites = ( + gConfig.getSections("Resources/Sites/LCG")["Value"] + gConfig.getSections("Resources/Sites/DIRAC")["Value"] + ) mocker.patch( - "DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.requestToken", side_effect=mockPMReply + "DIRAC.WorkloadManagementSystem.Agent.SiteDirector.SiteStatus.getUsableSites", return_values=usableSites ) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.AgentModule", side_effect=mockAM) sd = SiteDirector() + + # Set logger sd.log = gLogger - sd.am_getOption = mockAM sd.log.setLevel("DEBUG") - sd.rpcMatcher = MagicMock() - sd.rssClient = MagicMock() - sd.pilotAgentsDB = MagicMock() + + # Set basic parameters sd.workingDirectory = "" - sd.queueDict = { - "aQueue": { - "Site": "LCG.CERN.cern", - "CEName": "aCE", - "CEType": "SSH", - "QueueName": "aQueue", - "ParametersDict": { - "CPUTime": 12345, - "Community": "lhcb", - "OwnerGroup": ["lhcb_user"], - "Setup": "LHCb-Production", - "Site": "LCG.CERN.cern", - }, - } - } + + # Set VO + sd.vo = "dteam" + + # Set queueDict + sd.siteClient = SiteStatus() + sd._buildQueueDict() return sd -def test__getPilotOptions(sd): - """Testing SiteDirector()._getPilotOptions()""" - res = sd._getPilotOptions("aQueue") - assert {"-S TestSetup", "-V 123", "-l 123", "-n LCG.CERN.cern"} <= set(res) - - -@pytest.mark.parametrize( - "mockMatcherReturnValue, expected, anyExpected, sitesExpected", - [ - ({"OK": False, "Message": "boh"}, False, True, set()), - ({"OK": True, "Value": None}, False, True, set()), - ({"OK": True, "Value": {"1": {"Jobs": 10}, "2": {"Jobs": 20}}}, True, True, set()), - ({"OK": True, "Value": {"1": {"Jobs": 10, "Sites": ["Site1"]}, "2": {"Jobs": 20}}}, True, True, {"Site1"}), - ( - {"OK": True, "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20}}}, - True, - True, - {"Site1", "Site2"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["Site1"]}}, - }, - True, - False, - {"Site1", "Site2"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["ANY"]}}, - }, - True, - False, - {"Site1", "Site2", "ANY"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["ANY", "Site3"]}}, - }, - True, - False, - {"Site1", "Site2", "Site3", "ANY"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["Any", "Site3"]}}, - }, - True, - False, - {"Site1", "Site2", "Site3", "Any"}, - ), - ( - { - "OK": True, - "Value": { - "1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, - "2": {"Jobs": 20, "Sites": ["NotAny", "Site2"]}, - }, - }, - True, - False, - {"Site1", "Site2", "NotAny"}, - ), - ], -) -def test__ifAndWhereToSubmit(sd, mockMatcherReturnValue, expected, anyExpected, sitesExpected): - """Testing SiteDirector()._ifAndWhereToSubmit()""" - sd.matcherClient = MagicMock() - sd.matcherClient.getMatchingTaskQueues.return_value = mockMatcherReturnValue - res = sd._ifAndWhereToSubmit() - assert res[0] == expected - if res[0]: - assert res == (expected, anyExpected, sitesExpected, set()) - - -def test__allowedToSubmit(sd): - """Testing SiteDirector()._allowedToSubmit()""" - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is False - - sd.siteMaskList = ["LCG.CERN.cern", "DIRAC.CNAF.it"] - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is True - - sd.rssFlag = True - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is False - - sd.ceMaskList = ["aCE", "anotherCE"] - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is True - - -def test__submitPilotsToQueue(sd): - """Testing SiteDirector()._submitPilotsToQueue()""" - # Create a MagicMock that does not have the workingDirectory - # attribute (https://cpython-test-docs.readthedocs.io/en/latest/library/unittest.mock.html#deleting-attributes) - # This is to use the SiteDirector's working directory, not the CE one - ceMock = MagicMock() - del ceMock.workingDirectory - - sd.queueCECache = {"aQueue": {"CE": ceMock}} - sd.queueSlots = {"aQueue": {"AvailableSlots": 10}} - assert sd._submitPilotsToQueue(1, MagicMock(), "aQueue")["OK"] - - -@pytest.mark.parametrize( - "pilotRefs, pilotDict, pilotCEDict, expected", - [ - ([], {}, {}, (0, [])), - ( - ["aPilotRef"], - {"aPilotRef": {"Status": "Running", "LastUpdateTime": datetime.datetime(2000, 1, 1).utcnow()}}, - {}, - (0, []), - ), - ( - ["aPilotRef"], - {"aPilotRef": {"Status": "Running", "LastUpdateTime": datetime.datetime(2000, 1, 1).utcnow()}}, - {"aPilotRef": "Running"}, - (0, []), - ), - ( - ["aPilotRef"], - {"aPilotRef": {"Status": "Running", "LastUpdateTime": datetime.datetime(2000, 1, 1).utcnow()}}, - {"aPilotRef": "Unknown"}, - (0, []), - ), - ], -) -def test__updatePilotStatus(sd, pilotRefs, pilotDict, pilotCEDict, expected): - """Testing SiteDirector()._updatePilotStatus()""" - res = sd._updatePilotStatus(pilotRefs, pilotDict, pilotCEDict) - assert res == expected +@pytest.fixture(scope="session") +def pilotWrapperDirectory(tmp_path_factory): + """Create a temporary directory""" + fn = tmp_path_factory.mktemp("pilotWrappers") + return fn + + +def test_loadSubmissionPolicy(sd): + """Load each submission policy and call it""" + for submissionPolicyName in SUBMISSION_POLICIES: + # Load the submission policy + sd.submissionPolicyName = submissionPolicyName + res = sd._loadSubmissionPolicy() + assert res["OK"] + + # Call the submission policy with predefined parameters + targetQueue = "ce1.site1.com_condor" + res = sd.submissionPolicy.apply(50, ceParameters=sd.queueDict[targetQueue]["CE"].ceParameters) + assert res >= 0 and res <= 50 + + +def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory): + """Get pilot options for a specific queue and check the result, then generate the pilot wrapper""" + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gConfig.getValue", return_value="TestSetup") + + # Get pilot options + pilotOptions = sd._getPilotOptions("ce1.site1.com_condor") + assert { + "--preinstalledEnv=123", + "--pythonVersion=3", + "--wnVO=dteam", + "-n LCG.Site1.com", + "-N ce1.site1.com", + "-Q condor", + "-S TestSetup", + "-V 123", + "-l 123", + "-e 1,2,3", + } == set(pilotOptions) + + # Write pilot script + res = sd._writePilotScript(pilotWrapperDirectory, pilotOptions) + + # Make sure the file exists + assert os.path.exists(res) and os.path.isfile(res) + + +def test_updatePilotStatus(sd): + """Updating the status of some fake pilot references""" + # 1. We have not submitted any pilots, there is nothing to update + pilotDict = {} + pilotCEDict = {} + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert not res + + res = sd._getAbortedPilots(res) + assert not res + + # 2. We just submitted a pilot, the remote system has not had the time to register the pilot + pilotDict["pilotRef1"] = {"Status": PilotStatus.SUBMITTED, "LastUpdateTime": datetime.datetime.utcnow()} + pilotCEDict = {} + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert not res + + res = sd._getAbortedPilots(res) + assert not res + + # 3. The pilot is now registered + pilotCEDict["pilotRef1"] = PilotStatus.SUBMITTED + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert not res + + res = sd._getAbortedPilots(res) + assert not res + + # 4. The pilot waits in the queue of the remote CE + pilotCEDict["pilotRef1"] = PilotStatus.WAITING + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert res == {"pilotRef1": PilotStatus.WAITING} + + res = sd._getAbortedPilots(res) + assert not res + pilotDict["pilotRef1"]["Status"] = PilotStatus.WAITING + + # 5. CE issue: the pilot status becomes unknown + pilotCEDict["pilotRef1"] = PilotStatus.UNKNOWN + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert res == {"pilotRef1": PilotStatus.UNKNOWN} + + res = sd._getAbortedPilots(res) + assert not res + pilotDict["pilotRef1"]["Status"] = PilotStatus.UNKNOWN + + # 6. Engineers do not manage to fix the issue, the CE is still under maintenance + pilotDict["pilotRef1"]["LastUpdateTime"] = datetime.datetime.utcnow() - datetime.timedelta(seconds=3610) + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert res == {"pilotRef1": PilotStatus.ABORTED} + + res = sd._getAbortedPilots(res) + assert res == ["pilotRef1"] diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index fbbfb60f4bf..b8e84a85e2f 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -282,49 +282,33 @@ Agents VO = # VO treated (leave empty for auto-discovery) Community = - # Grid Environment (leave empty for auto-discovery) - GridEnv = # the DN of the certificate proxy used to submit pilots. If not found here, what is in Operations/Pilot section of the CS will be used PilotDN = # the group of the certificate proxy used to submit pilots. If not found here, what is in Operations/Pilot section of the CS will be used PilotGroup = - # List of sites that will be treated by this SiteDirector ("any" can refer to any Site defined in the CS) - Site = any - # List of CE types that will be treated by this SiteDirector ("any" can refer to any CE defined in the CS) - CETypes = any - # List of CEs that will be treated by this SiteDirector ("any" can refer to any type of CE defined in the CS) - CEs = any + # List of sites that will be treated by this SiteDirector (No value can refer to any Site defined in the CS) + Site = + # List of CEs that will be treated by this SiteDirector (No value can refer to any CE defined in the CS) + CEs = + # List of CE types that will be treated by this SiteDirector (No value can refer to any type of CE defined in the CS) + CETypes = # List of Tags that are required to be present in the CE/Queue definition Tags = + # How many cycles to skip if queue is not working + FailedQueueCycleFactor = 10 # The maximum length of a queue (in seconds). Default: 3 days MaxQueueLength = 259200 - # Log level of the pilots - PilotLogLevel = INFO # Max number of pilots to submit per cycle MaxPilotsToSubmit = 100 - # Check, or not, for the waiting pilots already submitted - PilotWaitingFlag = True - # How many cycels to skip if queue is not working - FailedQueueCycleFactor = 10 - # Every N cycles we update the pilots status - PilotStatusUpdateCycleFactor = 10 - # Every N cycles we update the number of available slots in the queues - AvailableSlotsUpdateCycleFactor = 10 - # Maximum number of times the Site Director is going to try to get a pilot output before stopping - MaxRetryGetPilotOutput = 3 - # To submit pilots to empty sites in any case - AddPilotsToEmptySites = False - # Should the SiteDirector consider platforms when deciding to submit pilots? - CheckPlatform = False - # Attribute used to define if the status of the pilots will be updated - UpdatePilotStatus = True - # Boolean value used to indicate if the pilot output will be or not retrieved - GetPilotOutput = False # Boolean value that indicates if the pilot job will send information for accounting SendPilotAccounting = True + # Submission policy to apply + SubmissionPolicy = WaitingSupportedJobs + # Working directory containing the pilot files if not set in the CE + WorkDirectory = } ##END ##BEGIN PushJobAgent diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py index 657114f6f19..7d2e87056e9 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py @@ -5,7 +5,7 @@ Available methods are: - addPilotTQReference() + addPilotReferences() setPilotStatus() deletePilot() clearPilots() @@ -38,11 +38,9 @@ def __init__(self, parentLogger=None): self.lock = threading.Lock() ########################################################################################## - def addPilotTQReference(self, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC", pilotStampDict={}): - """Add a new pilot job reference""" - - err = "PilotAgentsDB.addPilotTQReference: Failed to retrieve a new Id." + def addPilotReferences(self, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): + """Add a new pilot job reference""" for ref in pilotRef: stamp = "" if ref in pilotStampDict: @@ -50,9 +48,9 @@ def addPilotTQReference(self, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC req = ( "INSERT INTO PilotAgents " - + "(PilotJobReference, TaskQueueID, OwnerGroup, GridType, SubmissionTime, LastUpdateTime, Status, PilotStamp) " - + "VALUES ('%s',%d,'%s','%s',UTC_TIMESTAMP(),UTC_TIMESTAMP(),'Submitted','%s')" - % (ref, int(taskQueueID), ownerGroup, gridType, stamp) + + "(PilotJobReference, OwnerGroup, GridType, SubmissionTime, LastUpdateTime, Status, PilotStamp) " + + "VALUES ('%s','%s','%s',UTC_TIMESTAMP(),UTC_TIMESTAMP(),'Submitted','%s')" + % (ref, ownerGroup, gridType, stamp) ) result = self._update(req) @@ -60,7 +58,7 @@ def addPilotTQReference(self, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC return result if "lastRowId" not in result: - return S_ERROR(f"{err}") + return S_ERROR("PilotAgentsDB.addPilotReferences: Failed to retrieve a new Id.") return S_OK() @@ -278,7 +276,6 @@ def getPilotInfo(self, pilotRef=False, conn=False, paramNames=[], pilotID=False) "SubmissionTime", "PilotID", "LastUpdateTime", - "TaskQueueID", "GridSite", "PilotStamp", "Queue", @@ -470,31 +467,6 @@ def getJobsForPilot(self, pilotID): resDict[row[0]].append(row[1]) return S_OK(resDict) - ########################################################################################## - def getPilotsForTaskQueue(self, taskQueueID, gridType=None, limit=None): - """Get IDs of Pilot Agents that were submitted for the given taskQueue, - specify optionally the grid type, results are sorted by Submission time - an Optional limit can be set. - """ - - if gridType: - req = f"SELECT PilotID FROM PilotAgents WHERE TaskQueueID={taskQueueID} AND GridType='{gridType}' " - else: - req = f"SELECT PilotID FROM PilotAgents WHERE TaskQueueID={taskQueueID} " - - req += "ORDER BY SubmissionTime DESC " - - if limit: - req += f"LIMIT {limit}" - - result = self._query(req) - if not result["OK"]: - return result - if result["Value"]: - pilotList = [x[0] for x in result["Value"]] - return S_OK(pilotList) - return S_ERROR(f"PilotJobReferences for TaskQueueID {taskQueueID} not found") - ########################################################################################## def getPilotsForJobID(self, jobID): """Get ID of Pilot Agent that is running a given JobID""" @@ -1046,7 +1018,6 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems): "PilotID", "LastUpdateTime", "CurrentJobID", - "TaskQueueID", "GridSite", ] @@ -1084,7 +1055,7 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems): def getSummarySnapshot(self, requestedFields=False): """Get the summary snapshot for a given combination""" if not requestedFields: - requestedFields = ["TaskQueueID", "GridSite", "GridType", "Status"] + requestedFields = ["GridSite", "GridType", "Status"] valueFields = ["COUNT(PilotID)"] defString = ", ".join(requestedFields) valueString = ", ".join(valueFields) diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql index 418ad4877d2..d3a244972af 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql @@ -29,7 +29,6 @@ CREATE TABLE `PilotAgents` ( `PilotID` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT, `InitialJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0, `CurrentJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0, - `TaskQueueID` INT(11) UNSIGNED NOT NULL DEFAULT 0, `PilotJobReference` VARCHAR(255) NOT NULL DEFAULT 'Unknown', `PilotStamp` VARCHAR(32) NOT NULL DEFAULT '', `DestinationSite` VARCHAR(128) NOT NULL DEFAULT 'NotAssigned', diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index dcf4c053eed..06aaee3de0a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -74,23 +74,12 @@ def export_getCurrentPilotCounters(cls, attrDict={}): return S_OK(resultDict) ########################################################################################## - types_addPilotTQReference = [list, int, str, str] + types_addPilotReferences = [list, str] - @deprecated("Use addPilotTQRef") @classmethod - def export_addPilotTQReference( - cls, pilotRef, taskQueueID, ownerDN, ownerGroup, broker="Unknown", gridType="DIRAC", pilotStampDict={} - ): + def export_addPilotReferences(cls, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): """Add a new pilot job reference""" - - return cls.pilotAgentsDB.addPilotTQReference(pilotRef, taskQueueID, ownerGroup, gridType, pilotStampDict) - - types_addPilotTQRef = [list, int, str] - - @classmethod - def export_addPilotTQRef(cls, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC", pilotStampDict={}): - """Add a new pilot job reference""" - return cls.pilotAgentsDB.addPilotTQReference(pilotRef, taskQueueID, ownerGroup, gridType, pilotStampDict) + return cls.pilotAgentsDB.addPilotReferences(pilotRef, ownerGroup, gridType, pilotStampDict) ############################################################################## types_getPilotOutput = [str] @@ -347,41 +336,12 @@ def export_getGroupedPilotSummary(cls, columnList): @classmethod def export_getPilots(cls, jobID): - """Get pilot references and their states for : - - those pilots submitted for the TQ where job is sitting - - (or) the pilots executing/having executed the Job - """ - - pilots = [] + """Get pilots executing/having executed the Job""" result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID)) - if not result["OK"]: - if result["Message"].find("not found") == -1: - return S_ERROR("Failed to get pilot: " + result["Message"]) - else: - pilots += result["Value"] - if not pilots: - # Pilots were not found try to look in the Task Queue - taskQueueID = 0 - try: - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") - if not result["OK"]: - return result - tqDB = result["Value"]() - except RuntimeError as excp: - return S_ERROR(f"Can't connect to DB: {excp}") - result = tqDB.getTaskQueueForJob(int(jobID)) - if result["OK"] and result["Value"]: - taskQueueID = result["Value"] - if taskQueueID: - result = cls.pilotAgentsDB.getPilotsForTaskQueue(taskQueueID, limit=10) - if not result["OK"]: - return S_ERROR("Failed to get pilot: " + result["Message"]) - pilots += result["Value"] - - if not pilots: - return S_ERROR("Failed to get pilot for Job %d" % int(jobID)) - - return cls.pilotAgentsDB.getPilotInfo(pilotID=pilots) + if not result["OK"] or not result["Value"]: + return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}") + + return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"]) ############################################################################## types_killPilot = [[str, list]] diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py index 7fa2f10c3d4..226c4183a38 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py @@ -11,9 +11,7 @@ from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory -def getQueuesResolved( - siteDict, queueCECache, gridEnv=None, setup=None, workingDir="", checkPlatform=False, instantiateCEs=False -): +def getQueuesResolved(siteDict, queueCECache, vo=None, checkPlatform=False, instantiateCEs=False): """Get the list of relevant CEs (what is in siteDict) and their descriptions. The main goal of this method is to return a dictionary of queues """ @@ -24,7 +22,6 @@ def getQueuesResolved( for ce in siteDict[site]: ceDict = siteDict[site][ce] pilotRunDirectory = ceDict.get("PilotRunDirectory", "") - # ceMaxRAM = ceDict.get('MaxRAM', None) qDict = ceDict.pop("Queues") for queue in qDict: queueName = f"{ce}_{queue}" @@ -33,8 +30,8 @@ def getQueuesResolved( queueDict[queueName]["ParametersDict"]["Queue"] = queue queueDict[queueName]["ParametersDict"]["GridCE"] = ce queueDict[queueName]["ParametersDict"]["Site"] = site - queueDict[queueName]["ParametersDict"]["GridEnv"] = gridEnv - queueDict[queueName]["ParametersDict"]["Setup"] = setup + if vo: + queueDict[queueName]["ParametersDict"]["Community"] = vo # Evaluate the CPU limit of the queue according to the Glue convention computeQueueCPULimit(queueDict[queueName]["ParametersDict"]) @@ -80,7 +77,6 @@ def getQueuesResolved( queueDict[queueName]["CEType"] = ceDict["CEType"] queueDict[queueName]["Site"] = site queueDict[queueName]["QueueName"] = queue - queueDict[queueName]["QueryCEFlag"] = ceDict.get("QueryCEFlag", "false") if checkPlatform: setPlatform(ceDict, queueDict[queueName]["ParametersDict"]) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py index 61503a03dc0..ea816e68a88 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py @@ -169,11 +169,7 @@ def _setUpWorkloadCE(self, numberOfProcessorsPayload=1): if not result["OK"]: return result proxy = result["Value"]["chain"] - result = proxy.getRemainingSecs() - if not result["OK"]: - return result - lifetime_secs = result["Value"] - workloadCE.setProxy(proxy, lifetime_secs) + workloadCE.setProxy(proxy) return S_OK(workloadCE) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py b/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py new file mode 100644 index 00000000000..2ae82f6d34b --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py @@ -0,0 +1,64 @@ +from abc import ABC, abstractmethod + +from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient + + +# Submission policies +AGGRESSIVE_FILLING = "AggressiveFilling" +WAITING_SUPPORTED_JOBS = "WaitingSupportedJobs" +SUBMISSION_POLICIES = [AGGRESSIVE_FILLING, WAITING_SUPPORTED_JOBS] + + +class SubmissionPolicy(ABC): + """Abstract class to define a submission strategy.""" + + @abstractmethod + def apply(self, availableSlots: int, **kwargs) -> int: + """Method to redefine in the concrete subclasses + + :param availableSlots: slots available for new pilots + """ + if availableSlots < 0: + raise RuntimeError("Available slots cannot be negative") + + +class AggressiveFillingPolicy(SubmissionPolicy): + def apply(self, availableSlots: int, **kwargs) -> int: + """All the available slots should be filled up. + Should be employed for sites that are always processing jobs. + + * Pros: would quickly fill up a queue + * Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs + """ + super().apply(availableSlots, **kwargs) + return availableSlots + + +class WaitingSupportedJobsPolicy(SubmissionPolicy): + def __init__(self) -> None: + super().__init__() + self.matcherClient = MatcherClient() + + def apply(self, availableSlots: int, **kwargs) -> int: + """Fill up available slots only if waiting supported jobs exist. + Should be employed for sites that are used from time to time (targeting specific Task Queues). + + * Pros: submit pilots only if necessary, and quickly fill up the queue if needed + * Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue + + :param ceParameters: CE parameters + """ + super().apply(availableSlots, **kwargs) + # Get Task Queues related to the CE + result = self.matcherClient.getMatchingTaskQueues(kwargs["ceParameters"]) + if not result["OK"]: + return 0 + taskQueueDict = result["Value"] + + # Get the number of jobs that would match the capability of the CE + waitingSupportedJobs = 0 + for tq in taskQueueDict.values(): + waitingSupportedJobs += tq["Jobs"] + + # Return the minimum value between the number of slots available and supported jobs + return min(availableSlots, waitingSupportedJobs) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_QueueUtilities.py b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_QueueUtilities.py index 006db96af3b..ad7a89693c3 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_QueueUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_QueueUtilities.py @@ -22,7 +22,6 @@ "CE1_Queue1": { "CEName": "CE1", "CEType": "Type1", - "QueryCEFlag": False, "QueueName": "Queue1", "Site": "Site1", "ParametersDict": { @@ -39,7 +38,6 @@ "CE1_Queue2": { "CEName": "CE1", "CEType": "Type1", - "QueryCEFlag": False, "QueueName": "Queue2", "Site": "Site1", "ParametersDict": { @@ -56,7 +54,6 @@ "CE2_Queue1": { "CEName": "CE2", "CEType": "Type2", - "QueryCEFlag": False, "QueueName": "Queue1", "Site": "Site1", "ParametersDict": { @@ -73,7 +70,6 @@ "CE3_Queue1": { "CEName": "CE3", "CEType": "Type2", - "QueryCEFlag": False, "QueueName": "Queue1", "Site": "Site2", "ParametersDict": { diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py new file mode 100644 index 00000000000..66f3ce5209b --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py @@ -0,0 +1,91 @@ +""" Test class for Submission policy +""" +# pylint: disable=protected-access + +import pytest +from DIRAC.Core.Utilities.ReturnValues import S_OK + +from DIRAC.WorkloadManagementSystem.Client import PilotStatus +from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import ( + SUBMISSION_POLICIES, + AggressiveFillingPolicy, + WaitingSupportedJobsPolicy, +) + + +def test_AggressiveFillingPolicy(): + """Make sure it always return the number of slots provided""" + policy = AggressiveFillingPolicy() + + # 1. We want to submit 50 elements + numberToSubmit = policy.apply(50) + assert numberToSubmit == 50 + + # 2. We want to submit 0 element + numberToSubmit = policy.apply(0) + assert numberToSubmit == 0 + + # 3. We want to submit -10 elements + with pytest.raises(RuntimeError): + numberToSubmit = policy.apply(-10) + + +def test_WaitingSupportedJobsPolicy(mocker): + """Make sure it returns the min between the available slots and the jobs available""" + policy = WaitingSupportedJobsPolicy() + + # 1. We want to submit 50 elements without specifying the CE parameters + with pytest.raises(KeyError): + numberToSubmit = policy.apply(50) + + # 2. We want to submit 50 elements but there are no waiting job + # Because it requires an access to a DB, we mock the value returned by the Matcher + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", return_value=S_OK({}) + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 0 + + # 3. We want to submit 50 elements and we have 10 similar waiting jobs + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 10}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 10 + + # 4. We want to submit 50 elements and we have 10 waiting jobs, split into 2 task queues + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 10 + + # 5. We want to submit 50 elements and we have 60 similar waiting jobs + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 60}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 50 + + # 6. We want to submit 50 elements and we have 60 waiting jobs, split into 2 task queues + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 35}, "TQ2": {"Jobs": 25}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 50 + + # 6. We want to submit 50 elements and we have 60 waiting jobs, split into 2 task queues + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 35}, "TQ2": {"Jobs": 25}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 50 + + # 7. We want to submit -10 elements + with pytest.raises(RuntimeError): + numberToSubmit = policy.apply(-10, ceParameters={}) diff --git a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py index f1187ba088c..301d71b2131 100644 --- a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py +++ b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py @@ -25,10 +25,8 @@ class Params: def __init__(self): """C'or""" self.status = False - self.taskQueueID = 0 self.switches = [ ("", "status=", "sets the pilot status", self.setStatus), - ("t:", "taskQueueID=", "sets the taskQueueID", self.setTaskQueueID), ] def setStatus(self, value): @@ -45,19 +43,6 @@ def setStatus(self, value): self.status = value return S_OK() - def setTaskQueueID(self, value): - """sets self.taskQueueID - - :param value: option argument - - :return: S_OK()/S_ERROR() - """ - try: - self.taskQueueID = int(value) - except ValueError: - return S_ERROR("TaskQueueID has to be a number") - return S_OK() - @Script() def main(): @@ -89,7 +74,7 @@ def main(): if not DErrno.cmpError(res, DErrno.EWMSNOPILOT): gLogger.error(res["Message"]) DIRACExit(1) - res = pmc.addPilotTQRef([pilotRef], params.taskQueueID, ownerGroup, gridType, {pilotRef: pilotStamp}) + res = pmc.addPilotReferences([pilotRef], ownerGroup, gridType, {pilotRef: pilotStamp}) if not res["OK"]: gLogger.error(res["Message"]) DIRACExit(1) diff --git a/tests/Integration/Monitoring/Test_MonitoringReporter.py b/tests/Integration/Monitoring/Test_MonitoringReporter.py index 66b1f39771c..fe6ebf273f1 100644 --- a/tests/Integration/Monitoring/Test_MonitoringReporter.py +++ b/tests/Integration/Monitoring/Test_MonitoringReporter.py @@ -905,7 +905,6 @@ pilotsHistoryData = [ { - "TaskQueueID": "1240451", "GridSite": "LCG.CNAF.it", "GridType": "", "Status": "failed", @@ -913,7 +912,6 @@ "timestamp": 1649161714000, }, { - "TaskQueueID": "12401", "GridSite": "LCG.CNAF.it", "GridType": "", "Status": "failed", diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py b/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py index 91a21ac548a..51cecfab9ca 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py @@ -36,9 +36,8 @@ def preparePilots(stateCount, testSite, testCE, testGroup): for i in range(nPilots): pilotRef.append("pilotRef_" + str(i)) - res = paDB.addPilotTQReference( + res = paDB.addPilotReferences( pilotRef, - 123, testGroup, ) assert res["OK"] is True, res["Message"] @@ -80,7 +79,7 @@ def cleanUpPilots(pilotRef): def test_basic(): """usual insert/verify""" - res = paDB.addPilotTQReference(["pilotRef"], 123, "ownerGroup") + res = paDB.addPilotReferences(["pilotRef"], "ownerGroup") assert res["OK"] is True res = paDB.deletePilot("pilotRef") diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py index f97bf7be961..9d0069ff670 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py @@ -27,7 +27,7 @@ def test_PilotsDB(): for jobID in ["aPilot", "anotherPilot"]: pilots.deletePilots(jobID) - res = pilots.addPilotTQRef(["aPilot"], 1, "a/owner/Group") + res = pilots.addPilotReferences(["aPilot"], "a/owner/Group") assert res["OK"], res["Message"] res = pilots.getCurrentPilotCounters({}) assert res["OK"], res["Message"] @@ -38,7 +38,7 @@ def test_PilotsDB(): assert res["OK"], res["Message"] assert res["Value"] == {} - res = pilots.addPilotTQRef(["anotherPilot"], 1, "a/owner/Group") + res = pilots.addPilotReferences(["anotherPilot"], "a/owner/Group") assert res["OK"], res["Message"] res = pilots.storePilotOutput("anotherPilot", "This is an output", "this is an error") assert res["OK"], res["Message"]