From ff771eac7915231ffc9ab8186b9e99b61564565d Mon Sep 17 00:00:00 2001 From: fstagni Date: Wed, 10 Jun 2020 13:07:56 +0200 Subject: [PATCH 1/9] py2 to 3 --- .../Agent/MCExtensionAgent.py | 2 +- .../Agent/TaskManagerAgentBase.py | 10 +++---- .../Agent/TransformationCleaningAgent.py | 2 +- .../Agent/TransformationPlugin.py | 6 ++--- .../Agent/ValidateOutputDataAgent.py | 2 +- TransformationSystem/Client/TaskManager.py | 5 +++- TransformationSystem/Client/Transformation.py | 2 +- .../Client/TransformationCLI.py | 26 +++++++++---------- TransformationSystem/DB/TransformationDB.py | 8 +++--- .../Service/TransformationManagerHandler.py | 2 +- .../Utilities/TransformationInfo.py | 2 +- .../test/Test_TransformationInfo.py | 2 +- 12 files changed, 36 insertions(+), 33 deletions(-) diff --git a/TransformationSystem/Agent/MCExtensionAgent.py b/TransformationSystem/Agent/MCExtensionAgent.py index fb11d4d0ded..791e0b4d574 100755 --- a/TransformationSystem/Agent/MCExtensionAgent.py +++ b/TransformationSystem/Agent/MCExtensionAgent.py @@ -80,7 +80,7 @@ def extendTransformation(self, transID, maxTasks): else: statusDict = res['Value'] gLogger.verbose("Current task count for transformation %d" % transID) - for status in sorted(statusDict.keys()): + for status in sorted(list(statusDict)): statusCount = statusDict[status] gLogger.verbose("%s : %s" % (status.ljust(20), str(statusCount).rjust(8))) # Determine the number of tasks to be created diff --git a/TransformationSystem/Agent/TaskManagerAgentBase.py b/TransformationSystem/Agent/TaskManagerAgentBase.py index 241803885a9..036fc0d4488 100644 --- a/TransformationSystem/Agent/TaskManagerAgentBase.py +++ b/TransformationSystem/Agent/TaskManagerAgentBase.py @@ -285,7 +285,7 @@ def _execute(self, threadID): # Queue was cleared, nothing to do continue try: - transID = transIDOPBody.keys()[0] + transID = list(transIDOPBody)[0] operations = transIDOPBody[transID]['Operations'] if transID not in self.transInQueue: self._logWarn("Got a transf not in transInQueue...?", @@ -327,7 +327,7 @@ def _execute(self, threadID): def updateTaskStatus(self, transIDOPBody, clients): """ Updates the task status """ - transID = transIDOPBody.keys()[0] + transID = list(transIDOPBody)[0] method = 'updateTaskStatus' # Get the tasks which are in an UPDATE state, i.e. job statuses + request-specific statuses @@ -410,7 +410,7 @@ def updateTaskStatus(self, transIDOPBody, clients): def updateFileStatus(self, transIDOPBody, clients): """ Update the files status """ - transID = transIDOPBody.keys()[0] + transID = list(transIDOPBody)[0] method = 'updateFileStatus' timeStamp = str(datetime.datetime.utcnow() - datetime.timedelta(minutes=10)) @@ -480,7 +480,7 @@ def updateFileStatus(self, transIDOPBody, clients): def checkReservedTasks(self, transIDOPBody, clients): """ Checking Reserved tasks """ - transID = transIDOPBody.keys()[0] + transID = list(transIDOPBody)[0] method = 'checkReservedTasks' # Select the tasks which have been in Reserved status for more than 1 hour for selected transformations @@ -546,7 +546,7 @@ def submitTasks(self, transIDOPBody, clients): :return: S_OK/S_ERROR """ - transID = transIDOPBody.keys()[0] + transID = list(transIDOPBody)[0] transBody = transIDOPBody[transID]['Body'] owner = transIDOPBody[transID]['Owner'] ownerGroup = transIDOPBody[transID]['OwnerGroup'] diff --git a/TransformationSystem/Agent/TransformationCleaningAgent.py b/TransformationSystem/Agent/TransformationCleaningAgent.py index a5f126965e5..2a346191166 100644 --- a/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -348,7 +348,7 @@ def __getCatalogDirectoryContents(self, directories): activeDirs.extend(dirContents['SubDirs']) allFiles.update(dirContents['Files']) self.log.info("Found %d files" % len(allFiles)) - return S_OK(allFiles.keys()) + return S_OK(list(allFiles)) def cleanTransformationLogFiles(self, directory): """ clean up transformation logs from directory :directory: diff --git a/TransformationSystem/Agent/TransformationPlugin.py b/TransformationSystem/Agent/TransformationPlugin.py index f92d4d429cc..ead4c3fb49f 100644 --- a/TransformationSystem/Agent/TransformationPlugin.py +++ b/TransformationSystem/Agent/TransformationPlugin.py @@ -177,11 +177,11 @@ def _ByShare(self, shareType='CPU'): return res cpuShares = res['Value'] self.util.logInfo("Obtained the following target shares (%):") - for site in sorted(cpuShares.keys()): + for site in sorted(list(cpuShares)): self.util.logInfo("%s: %.1f" % (site.ljust(15), cpuShares[site])) # Get the existing destinations from the transformationDB - res = self.util.getExistingCounters(requestedSites=cpuShares.keys()) + res = self.util.getExistingCounters(requestedSites=list(cpuShares)) if not res['OK']: self.util.logError("Failed to get existing file share", res['Message']) return res @@ -189,7 +189,7 @@ def _ByShare(self, shareType='CPU'): if existingCount: self.util.logInfo("Existing site utilization (%):") normalisedExistingCount = self.util._normaliseShares(existingCount.copy()) # pylint: disable=protected-access - for se in sorted(normalisedExistingCount.keys()): + for se in sorted(list(normalisedExistingCount)): self.util.logInfo("%s: %.1f" % (se.ljust(15), normalisedExistingCount[se])) # Group the input files by their existing replicas diff --git a/TransformationSystem/Agent/ValidateOutputDataAgent.py b/TransformationSystem/Agent/ValidateOutputDataAgent.py index 32d2ee1aadb..049321bf51e 100755 --- a/TransformationSystem/Agent/ValidateOutputDataAgent.py +++ b/TransformationSystem/Agent/ValidateOutputDataAgent.py @@ -197,7 +197,7 @@ def checkTransformationIntegrity(self, transID): if res['Value']['Failed']: return S_ERROR("Failed to determine the existance of directories") directoryExists = res['Value']['Successful'] - for directory in sorted(directoryExists.keys()): + for directory in sorted(list(directoryExists)): if not directoryExists[directory]: continue iRes = self.consistencyInspector.catalogDirectoryToSE(directory) diff --git a/TransformationSystem/Client/TaskManager.py b/TransformationSystem/Client/TaskManager.py index 89ea9ed31f0..eee94d21323 100644 --- a/TransformationSystem/Client/TaskManager.py +++ b/TransformationSystem/Client/TaskManager.py @@ -391,7 +391,7 @@ def getSubmittedFileStatus(self, fileDicts): if transID is None: return S_OK({}) - res = self.transClient.getTransformationTasks({'TransformationID': transID, 'TaskID': taskFiles.keys()}) + res = self.transClient.getTransformationTasks({'TransformationID': transID, 'TaskID': list(taskFiles)}) if not res['OK']: return res requestFiles = {} @@ -668,10 +668,12 @@ def __prepareTasks(self, transBody, taskDict, owner, ownerGroup, ownerDN): method = '__prepareTasks' startTime = time.time() + oJobTemplate = self.jobClass(transBody) oJobTemplate.setOwner(owner) oJobTemplate.setOwnerGroup(ownerGroup) oJobTemplate.setOwnerDN(ownerDN) + try: site = oJobTemplate.workflow.findParameter('Site').getValue() except AttributeError: @@ -679,6 +681,7 @@ def __prepareTasks(self, transBody, taskDict, owner, ownerGroup, ownerDN): jobType = oJobTemplate.workflow.findParameter('JobType').getValue() templateOK = False getOutputDataTiming = 0. + for taskID, paramsDict in taskDict.iteritems(): # Create a job for each task and add it to the taskDict if not templateOK: diff --git a/TransformationSystem/Client/Transformation.py b/TransformationSystem/Client/Transformation.py index a29eb2943b5..0cff5c91eaa 100644 --- a/TransformationSystem/Client/Transformation.py +++ b/TransformationSystem/Client/Transformation.py @@ -187,7 +187,7 @@ def __getattr__(self, name): def __getParam(self): if self.item_called == 'Available': - return S_OK(self.paramTypes.keys()) + return S_OK(list(self.paramTypes)) if self.item_called == 'Parameters': return S_OK(self.paramValues) if self.item_called in self.paramValues: diff --git a/TransformationSystem/Client/TransformationCLI.py b/TransformationSystem/Client/TransformationCLI.py index cdf02a85909..37f96a1f385 100644 --- a/TransformationSystem/Client/TransformationCLI.py +++ b/TransformationSystem/Client/TransformationCLI.py @@ -541,11 +541,11 @@ def do_replicas(self, args): if not res['OK']: print("failed to get any replica information: %s" % res['Message']) return - for lfn in sorted(res['Value']['Failed'].keys()): + for lfn in sorted(list(res['Value']['Failed'])): error = res['Value']['Failed'][lfn] print("failed to get replica information for %s: %s" % (lfn, error)) - for lfn in sorted(res['Value']['Successful'].keys()): - ses = sorted(res['Value']['Successful'][lfn].keys()) + for lfn in sorted(list(res['Value']['Successful'])): + ses = sorted(list(res['Value']['Successful'][lfn])) outStr = "%s :" % lfn.ljust(100) for se in ses: outStr = "%s %s" % (outStr, se.ljust(15)) @@ -568,10 +568,10 @@ def do_addFile(self, args): if not res['OK']: print("failed to add any files: %s" % res['Message']) return - for lfn in sorted(res['Value']['Failed'].keys()): + for lfn in sorted(list(res['Value']['Failed'])): error = res['Value']['Failed'][lfn] print("failed to add %s: %s" % (lfn, error)) - for lfn in sorted(res['Value']['Successful'].keys()): + for lfn in sorted(list(res['Value']['Successful'])): print("added %s" % lfn) def do_removeFile(self, args): @@ -587,10 +587,10 @@ def do_removeFile(self, args): if not res['OK']: print("failed to remove any files: %s" % res['Message']) return - for lfn in sorted(res['Value']['Failed'].keys()): + for lfn in sorted(list(res['Value']['Failed'])): error = res['Value']['Failed'][lfn] print("failed to remove %s: %s" % (lfn, error)) - for lfn in sorted(res['Value']['Successful'].keys()): + for lfn in sorted(list(res['Value']['Successful'])): print("removed %s" % lfn) def do_addReplica(self, args): @@ -610,10 +610,10 @@ def do_addReplica(self, args): if not res['OK']: print("failed to add replica: %s" % res['Message']) return - for lfn in sorted(res['Value']['Failed'].keys()): + for lfn in sorted(list(res['Value']['Failed'])): error = res['Value']['Failed'][lfn] print("failed to add replica: %s" % (error)) - for lfn in sorted(res['Value']['Successful'].keys()): + for lfn in sorted(list(res['Value']['Successful'])): print("added %s" % lfn) def do_removeReplica(self, args): @@ -633,10 +633,10 @@ def do_removeReplica(self, args): if not res['OK']: print("failed to remove replica: %s" % res['Message']) return - for lfn in sorted(res['Value']['Failed'].keys()): + for lfn in sorted(list(res['Value']['Failed'])): error = res['Value']['Failed'][lfn] print("failed to remove replica: %s" % (error)) - for lfn in sorted(res['Value']['Successful'].keys()): + for lfn in sorted(list(res['Value']['Successful'])): print("removed %s" % lfn) def do_setReplicaStatus(self, args): @@ -663,10 +663,10 @@ def do_setReplicaStatus(self, args): if not res['OK']: print("failed to set replica status: %s" % res['Message']) return - for lfn in sorted(res['Value']['Failed'].keys()): + for lfn in sorted(list(res['Value']['Failed'])): error = res['Value']['Failed'][lfn] print("failed to set replica status: %s" % (error)) - for lfn in sorted(res['Value']['Successful'].keys()): + for lfn in sorted(list(res['Value']['Successful'])): print("updated replica status %s" % lfn) diff --git a/TransformationSystem/DB/TransformationDB.py b/TransformationSystem/DB/TransformationDB.py index e75ba36d298..3d72e58f0b6 100755 --- a/TransformationSystem/DB/TransformationDB.py +++ b/TransformationSystem/DB/TransformationDB.py @@ -524,7 +524,7 @@ def addFilesToTransformation(self, transName, lfns, connection=False): # Attach files to transformation successful = {} if fileIDs: - res = self.__addFilesToTransformation(transID, fileIDs.keys(), connection=connection) + res = self.__addFilesToTransformation(transID, list(fileIDs), connection=connection) if not res['OK']: return res for fileID in fileIDs: @@ -550,7 +550,7 @@ def getTransformationFiles(self, condDict=None, older=None, newer=None, timeStam if not res['OK']: return res originalFileIDs = res['Value'][0] - condDict['FileID'] = originalFileIDs.keys() + condDict['FileID'] = list(originalFileIDs) for val in condDict.itervalues(): if not val: @@ -1536,10 +1536,10 @@ def removeFile(self, lfns, connection=False): if lfn not in lfnFilesIDs: successful[lfn] = 'File does not exist' if fileIDs: - res = self.__setTransformationFileStatus(fileIDs.keys(), 'Deleted', connection=connection) + res = self.__setTransformationFileStatus(list(fileIDs), 'Deleted', connection=connection) if not res['OK']: return res - res = self.__setDataFileStatus(fileIDs.keys(), 'Deleted', connection=connection) + res = self.__setDataFileStatus(list(fileIDs), 'Deleted', connection=connection) if not res['OK']: return S_ERROR("TransformationDB.removeFile: Failed to remove files.") for lfn in lfnFilesIDs: diff --git a/TransformationSystem/Service/TransformationManagerHandler.py b/TransformationSystem/Service/TransformationManagerHandler.py index c31e8b24815..6cd8a1b6fcb 100644 --- a/TransformationSystem/Service/TransformationManagerHandler.py +++ b/TransformationSystem/Service/TransformationManagerHandler.py @@ -392,7 +392,7 @@ def export_removeFile(self, lfns): """ Interface provides [ LFN1, LFN2, ... ] """ if isinstance(lfns, dict): - lfns = lfns.keys() + lfns = list(lfns) res = database.removeFile(lfns) return self._parseRes(res) diff --git a/TransformationSystem/Utilities/TransformationInfo.py b/TransformationSystem/Utilities/TransformationInfo.py index ff51d8e4a7e..509151e28eb 100644 --- a/TransformationSystem/Utilities/TransformationInfo.py +++ b/TransformationSystem/Utilities/TransformationInfo.py @@ -155,7 +155,7 @@ def cleanOutputs(self, jobInfo): for lfn, err in result['Value']['Failed'].items(): reason = str(err) errorReasons[reason].append(lfn) - successfullyRemoved += len(result['Value']['Successful'].keys()) + successfullyRemoved += len(list(result['Value']['Successful'])) for reason, lfns in errorReasons.items(): self.log.error("Failed to remove %d files with error: %s" % (len(lfns), reason)) self.log.notice("Successfully removed %d files" % successfullyRemoved) diff --git a/TransformationSystem/test/Test_TransformationInfo.py b/TransformationSystem/test/Test_TransformationInfo.py index beb2a7141e1..30ab7a5dba8 100644 --- a/TransformationSystem/test/Test_TransformationInfo.py +++ b/TransformationSystem/test/Test_TransformationInfo.py @@ -352,7 +352,7 @@ def test_getJobs(tiFixture): assert ndone == 3 assert nfailed == 3 assert isinstance(jobs, OrderedDict) - assert [56, 89, 123, 456, 789, 1123] == jobs.keys() + assert [56, 89, 123, 456, 789, 1123] == list(jobs) # All ERROR tiFixture.jobMon.getJobs = Mock() From 596f1e43c2b9251a930f74c9f51e15bc8d2f725c Mon Sep 17 00:00:00 2001 From: fstagni Date: Thu, 11 Jun 2020 16:57:48 +0200 Subject: [PATCH 2/9] changed dir --- virtualmachine/Vagrantfile | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/virtualmachine/Vagrantfile b/virtualmachine/Vagrantfile index 9337da73b4c..2a5516ae3cf 100644 --- a/virtualmachine/Vagrantfile +++ b/virtualmachine/Vagrantfile @@ -50,7 +50,8 @@ Vagrant.configure("2") do |config| # the path on the host to the actual folder. The second argument is # the path on the guest to mount the folder. And the optional third # argument is a set of non-required options. - config.vm.synced_folder "..", "/opt/dirac/versions/hostcode" + config.vm.synced_folder "..", "/home/vagrant/hostcode/DIRAC" + config.vm.synced_folder "../../WebAppDIRAC", "/home/vagrant/hostcode/WebAppDIRAC" config.vm.synced_folder "../../certs", "/home/vagrant/.globus" # Provider-specific configuration so you can fine-tune various @@ -77,16 +78,16 @@ Vagrant.configure("2") do |config| # config.vm.provision "shell", inline: <<-SHELL # #!/bin/bash - # # Create DIRAC dirs - # mkdir -p /opt/dirac/DIRAC && \ - # mkdir -p /opt/dirac/etc/grid-security/certificates && \ - # cd /opt/dirac + # sudo chown vagrant:vagrant hostcode/ + # ln -s hostcode/DIRAC/ DIRAC + # # Create etc dir and link it + # mkdir -p /home/vagrant/etc/grid-security/certificates + # ln -s /home/vagrant/etc /home/vagrant/hostcode/etc # # Installing DIRAC in /opt/dirac # curl -L -o dirac-install https://raw.githubusercontent.com/DIRACGrid/DIRAC/integration/Core/scripts/dirac-install.py && \ # chmod +x dirac-install && \ # ./dirac-install -r $DIRAC_VERSION -t client && \ - # rm -rf /opt/dirac/.installCache && \ # rm dirac-install && \ # ln -s /etc/grid-security/certificates/ /opt/dirac/etc/grid-security/certificates From 79100ebd2c6bde64a216168ffa7f85cc33dd451f Mon Sep 17 00:00:00 2001 From: fstagni Date: Thu, 11 Jun 2020 17:29:58 +0200 Subject: [PATCH 3/9] added more selectable fields for methods using gJobDB.getDistinctJobAttributes --- .../Service/JobMonitoringHandler.py | 33 +++++++++---------- .../Test_Client_WMS.py | 15 ++++++--- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/WorkloadManagementSystem/Service/JobMonitoringHandler.py index ccba404d2ca..3167a375e88 100755 --- a/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -74,80 +74,79 @@ def initialize(self): types_getApplicationStates = [] @staticmethod - def export_getApplicationStates(): + def export_getApplicationStates(condDict=None, older=None, newer=None): """ Return Distinct Values of ApplicationStatus job Attribute in WMS """ - return gJobDB.getDistinctJobAttributes('ApplicationStatus') + return gJobDB.getDistinctJobAttributes('ApplicationStatus', condDict, older, newer) ############################################################################## types_getJobTypes = [] @staticmethod - def export_getJobTypes(): + def export_getJobTypes(condDict=None, older=None, newer=None): """ Return Distinct Values of JobType job Attribute in WMS """ - return gJobDB.getDistinctJobAttributes('JobType') + return gJobDB.getDistinctJobAttributes('JobType', condDict, older, newer) ############################################################################## types_getOwners = [] @staticmethod - def export_getOwners(): + def export_getOwners(condDict=None, older=None, newer=None): """ Return Distinct Values of Owner job Attribute in WMS """ - return gJobDB.getDistinctJobAttributes('Owner') + return gJobDB.getDistinctJobAttributes('Owner', condDict, older, newer) ############################################################################## types_getProductionIds = [] @staticmethod - def export_getProductionIds(): + def export_getProductionIds(condDict=None, older=None, newer=None): """ Return Distinct Values of ProductionId job Attribute in WMS """ - return gJobDB.getDistinctJobAttributes('JobGroup') + return gJobDB.getDistinctJobAttributes('JobGroup', condDict, older, newer) ############################################################################## types_getJobGroups = [] @staticmethod - def export_getJobGroups(condDict=None, cutDate=None): + def export_getJobGroups(condDict=None, older=None, cutDate=None): """ Return Distinct Values of ProductionId job Attribute in WMS """ - return gJobDB.getDistinctJobAttributes('JobGroup', condDict, - newer=cutDate) + return gJobDB.getDistinctJobAttributes('JobGroup', condDict, older, newer=cutDate) ############################################################################## types_getSites = [] @staticmethod - def export_getSites(): + def export_getSites(condDict=None, older=None, newer=None): """ Return Distinct Values of Site job Attribute in WMS """ - return gJobDB.getDistinctJobAttributes('Site') + return gJobDB.getDistinctJobAttributes('Site', condDict, older, newer) ############################################################################## types_getStates = [] @staticmethod - def export_getStates(): + def export_getStates(condDict=None, older=None, newer=None): """ Return Distinct Values of Status job Attribute in WMS """ - return gJobDB.getDistinctJobAttributes('Status') + return gJobDB.getDistinctJobAttributes('Status', condDict, older, newer) ############################################################################## types_getMinorStates = [] @staticmethod - def export_getMinorStates(): + def export_getMinorStates(condDict=None, older=None, newer=None): """ Return Distinct Values of Minor Status job Attribute in WMS """ - return gJobDB.getDistinctJobAttributes('MinorStatus') + return gJobDB.getDistinctJobAttributes('MinorStatus', condDict, older, newer) ############################################################################## types_getJobs = [] diff --git a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py index 87cf125b429..30043850e8e 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py +++ b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py @@ -26,7 +26,7 @@ # pylint: disable=protected-access,wrong-import-position,invalid-name from __future__ import print_function -from past.builtins import long + import unittest import sys import datetime @@ -365,6 +365,15 @@ def test_JobStateUpdateAndJobMonitoringMultuple(self): self.assertTrue(res['OK'], res.get('Message')) res = jobMonitor.getJobGroups() self.assertTrue(res['OK'], res.get('Message')) + resJG_empty = res['Value'] + res = jobMonitor.getJobGroups(None, datetime.datetime.utcnow()) + self.assertTrue(res['OK'], res.get('Message')) + resJG_olderThanNow = res['Value'] + self.assertEqual(resJG_empty, resJG_olderThanNow) + res = jobMonitor.getJobGroups(None, datetime.datetime.utcnow() - datetime.timedelta(days=365)) + self.assertTrue(res['OK'], res.get('Message')) + resJG_olderThanOneYear = res['Value'] + self.assertTrue(set(resJG_olderThanOneYear).issubset(set(resJG_olderThanNow))) res = jobMonitor.getStates() self.assertTrue(res['OK'], res.get('Message')) self.assertTrue(sorted(res['Value']) in [['Received'], sorted(['Received', 'Waiting'])]) @@ -382,9 +391,7 @@ def test_JobStateUpdateAndJobMonitoringMultuple(self): try: self.assertTrue( res['Value'].get('Received') + - res['Value'].get('Waiting') >= long( - len(lfnss) * - len(types))) + res['Value'].get('Waiting') >= int(len(lfnss) * len(types))) except TypeError: pass res = jobMonitor.getJobsSummary(jobIDs) From 938fa731c6c95595f9cfd843c865a23cc9a4f763 Mon Sep 17 00:00:00 2001 From: fstagni Date: Thu, 11 Jun 2020 18:07:00 +0200 Subject: [PATCH 4/9] TransformationCleaningAgent will (re)clean very old transformations that are still in the system --- .../Agent/TransformationCleaningAgent.py | 58 ++++++++++++++++--- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/TransformationSystem/Agent/TransformationCleaningAgent.py b/TransformationSystem/Agent/TransformationCleaningAgent.py index 2a346191166..f8fb6ae2931 100644 --- a/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -23,16 +23,17 @@ from DIRAC.Core.Utilities.List import breakListIntoChunks from DIRAC.Core.Utilities.Proxy import executeWithUserProxy from DIRAC.Core.Utilities.DErrno import cmpError +from DIRAC.Core.Utilities.ReturnValues import returnSingleResult +from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations -from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient -from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient -from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient from DIRAC.DataManagementSystem.Client.DataManager import DataManager -from DIRAC.Resources.Storage.StorageElement import StorageElement -from DIRAC.Core.Utilities.ReturnValues import returnSingleResult +from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient from DIRAC.Resources.Catalog.FileCatalog import FileCatalog -from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData +from DIRAC.Resources.Storage.StorageElement import StorageElement from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient +from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient +from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient +from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient # # agent's name AGENT_NAME = 'Transformation/TransformationCleaningAgent' @@ -83,7 +84,7 @@ def __init__(self, *args, **kwargs): def initialize(self): """ agent initialisation - reading and setting confing opts + reading and setting config opts :param self: self reference """ @@ -124,6 +125,49 @@ def initialize(self): # # file catalog client self.metadataClient = FileCatalogClient() + # Only at (re)start: will clean ancient transformations (remnants) + # 1) get the transformation IDs of jobs that are older than 1 year + # 2) find the status of those transformations. Those "Cleaned" and "Archived" will be + # cleaned and archived (again) + res = JobMonitoringClient().getJobGroups(None, datetime.utcnow() - timedelta(days=365)) + if not res['OK']: + self.log.error("Failed to get job groups", res['Message']) + return res + transformationIDs = res['Value'] + if transformationIDs: + res = TransformationClient().getTransformations({'TransformationID': list(transformationIDs)}) + if not res['OK']: + self.log.error("Failed to get transformations", res['Message']) + return res + transformations = res['Value'] + toClean = [] + toArchive = [] + for transDict in transformations: + if transDict['Status'] == 'Cleaned': + toClean.append(transDict) + if transDict['Status'] == 'Archived': + toArchive.append(transDict) + + for transDict in toClean: + if self.shifterProxy: + self._executeClean(transDict) + else: + self.log.info("Cleaning transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" % + transDict) + executeWithUserProxy(self._executeClean)(transDict, + proxyUserDN=transDict['AuthorDN'], + proxyUserGroup=transDict['AuthorGroup']) + + for transDict in toArchive: + if self.shifterProxy: + self._executeArchive(transDict) + else: + self.log.info("Archiving files for transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" % + transDict) + executeWithUserProxy(self._executeArchive)(transDict, + proxyUserDN=transDict['AuthorDN'], + proxyUserGroup=transDict['AuthorGroup']) + return S_OK() ############################################################################# From 8f2db06125f43de130e52f00dd9706feed95ab02 Mon Sep 17 00:00:00 2001 From: fstagni Date: Fri, 12 Jun 2020 12:34:42 +0200 Subject: [PATCH 5/9] sanitize DNs --- ConfigurationSystem/Client/Helpers/Registry.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ConfigurationSystem/Client/Helpers/Registry.py b/ConfigurationSystem/Client/Helpers/Registry.py index eaa337add26..13eeef17da8 100644 --- a/ConfigurationSystem/Client/Helpers/Registry.py +++ b/ConfigurationSystem/Client/Helpers/Registry.py @@ -23,6 +23,7 @@ def getUsernameForDN(dn, usersList=None): :return: S_OK(str)/S_ERROR() """ + dn = dn.strip() if not usersList: result = gConfig.getSections("%s/Users" % gBaseRegistrySection) if not result['OK']: @@ -63,6 +64,7 @@ def getGroupsForDN(dn): :return: S_OK(list)/S_ERROR() -- contain list of groups """ + dn = dn.strip() result = getUsernameForDN(dn) if not result['OK']: return result @@ -128,6 +130,7 @@ def getHostnameForDN(dn): :return: S_OK()/S_ERROR() """ + dn = dn.strip() result = gConfig.getSections("%s/Hosts" % gBaseRegistrySection) if not result['OK']: return result @@ -153,6 +156,7 @@ def findDefaultGroupForDN(dn): :return: S_OK()/S_ERROR() """ + dn = dn.strip() result = getUsernameForDN(dn) if not result['OK']: return result From b371142dc4e484ce565f0fea6d60180b3eda83a5 Mon Sep 17 00:00:00 2001 From: fstagni Date: Fri, 12 Jun 2020 12:57:31 +0200 Subject: [PATCH 6/9] no need for casting to list --- .../Agent/MCExtensionAgent.py | 2 +- .../Agent/TransformationPlugin.py | 4 +-- .../Agent/ValidateOutputDataAgent.py | 2 +- .../Client/TransformationCLI.py | 26 +++++++++---------- .../Utilities/TransformationInfo.py | 8 +++--- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/TransformationSystem/Agent/MCExtensionAgent.py b/TransformationSystem/Agent/MCExtensionAgent.py index 791e0b4d574..953349bdf37 100755 --- a/TransformationSystem/Agent/MCExtensionAgent.py +++ b/TransformationSystem/Agent/MCExtensionAgent.py @@ -80,7 +80,7 @@ def extendTransformation(self, transID, maxTasks): else: statusDict = res['Value'] gLogger.verbose("Current task count for transformation %d" % transID) - for status in sorted(list(statusDict)): + for status in sorted(statusDict): statusCount = statusDict[status] gLogger.verbose("%s : %s" % (status.ljust(20), str(statusCount).rjust(8))) # Determine the number of tasks to be created diff --git a/TransformationSystem/Agent/TransformationPlugin.py b/TransformationSystem/Agent/TransformationPlugin.py index ead4c3fb49f..98811de4cdb 100644 --- a/TransformationSystem/Agent/TransformationPlugin.py +++ b/TransformationSystem/Agent/TransformationPlugin.py @@ -177,7 +177,7 @@ def _ByShare(self, shareType='CPU'): return res cpuShares = res['Value'] self.util.logInfo("Obtained the following target shares (%):") - for site in sorted(list(cpuShares)): + for site in sorted(cpuShares): self.util.logInfo("%s: %.1f" % (site.ljust(15), cpuShares[site])) # Get the existing destinations from the transformationDB @@ -189,7 +189,7 @@ def _ByShare(self, shareType='CPU'): if existingCount: self.util.logInfo("Existing site utilization (%):") normalisedExistingCount = self.util._normaliseShares(existingCount.copy()) # pylint: disable=protected-access - for se in sorted(list(normalisedExistingCount)): + for se in sorted(normalisedExistingCount): self.util.logInfo("%s: %.1f" % (se.ljust(15), normalisedExistingCount[se])) # Group the input files by their existing replicas diff --git a/TransformationSystem/Agent/ValidateOutputDataAgent.py b/TransformationSystem/Agent/ValidateOutputDataAgent.py index 049321bf51e..87d194825a6 100755 --- a/TransformationSystem/Agent/ValidateOutputDataAgent.py +++ b/TransformationSystem/Agent/ValidateOutputDataAgent.py @@ -197,7 +197,7 @@ def checkTransformationIntegrity(self, transID): if res['Value']['Failed']: return S_ERROR("Failed to determine the existance of directories") directoryExists = res['Value']['Successful'] - for directory in sorted(list(directoryExists)): + for directory in sorted(directoryExists): if not directoryExists[directory]: continue iRes = self.consistencyInspector.catalogDirectoryToSE(directory) diff --git a/TransformationSystem/Client/TransformationCLI.py b/TransformationSystem/Client/TransformationCLI.py index 37f96a1f385..d209c0960bc 100644 --- a/TransformationSystem/Client/TransformationCLI.py +++ b/TransformationSystem/Client/TransformationCLI.py @@ -541,11 +541,11 @@ def do_replicas(self, args): if not res['OK']: print("failed to get any replica information: %s" % res['Message']) return - for lfn in sorted(list(res['Value']['Failed'])): + for lfn in sorted(res['Value']['Failed']): error = res['Value']['Failed'][lfn] print("failed to get replica information for %s: %s" % (lfn, error)) - for lfn in sorted(list(res['Value']['Successful'])): - ses = sorted(list(res['Value']['Successful'][lfn])) + for lfn in sorted(res['Value']['Successful']): + ses = sorted(res['Value']['Successful'][lfn]) outStr = "%s :" % lfn.ljust(100) for se in ses: outStr = "%s %s" % (outStr, se.ljust(15)) @@ -568,10 +568,10 @@ def do_addFile(self, args): if not res['OK']: print("failed to add any files: %s" % res['Message']) return - for lfn in sorted(list(res['Value']['Failed'])): + for lfn in sorted(res['Value']['Failed']): error = res['Value']['Failed'][lfn] print("failed to add %s: %s" % (lfn, error)) - for lfn in sorted(list(res['Value']['Successful'])): + for lfn in sorted(res['Value']['Successful']): print("added %s" % lfn) def do_removeFile(self, args): @@ -587,10 +587,10 @@ def do_removeFile(self, args): if not res['OK']: print("failed to remove any files: %s" % res['Message']) return - for lfn in sorted(list(res['Value']['Failed'])): + for lfn in sorted(res['Value']['Failed']): error = res['Value']['Failed'][lfn] print("failed to remove %s: %s" % (lfn, error)) - for lfn in sorted(list(res['Value']['Successful'])): + for lfn in sorted(res['Value']['Successful']): print("removed %s" % lfn) def do_addReplica(self, args): @@ -610,10 +610,10 @@ def do_addReplica(self, args): if not res['OK']: print("failed to add replica: %s" % res['Message']) return - for lfn in sorted(list(res['Value']['Failed'])): + for lfn in sorted(res['Value']['Failed']): error = res['Value']['Failed'][lfn] print("failed to add replica: %s" % (error)) - for lfn in sorted(list(res['Value']['Successful'])): + for lfn in sorted(res['Value']['Successful']): print("added %s" % lfn) def do_removeReplica(self, args): @@ -633,10 +633,10 @@ def do_removeReplica(self, args): if not res['OK']: print("failed to remove replica: %s" % res['Message']) return - for lfn in sorted(list(res['Value']['Failed'])): + for lfn in sorted(res['Value']['Failed']): error = res['Value']['Failed'][lfn] print("failed to remove replica: %s" % (error)) - for lfn in sorted(list(res['Value']['Successful'])): + for lfn in sorted(res['Value']['Successful']): print("removed %s" % lfn) def do_setReplicaStatus(self, args): @@ -663,10 +663,10 @@ def do_setReplicaStatus(self, args): if not res['OK']: print("failed to set replica status: %s" % res['Message']) return - for lfn in sorted(list(res['Value']['Failed'])): + for lfn in sorted(res['Value']['Failed']): error = res['Value']['Failed'][lfn] print("failed to set replica status: %s" % (error)) - for lfn in sorted(list(res['Value']['Successful'])): + for lfn in sorted(res['Value']['Successful']): print("updated replica status %s" % lfn) diff --git a/TransformationSystem/Utilities/TransformationInfo.py b/TransformationSystem/Utilities/TransformationInfo.py index 509151e28eb..c47da7dce90 100644 --- a/TransformationSystem/Utilities/TransformationInfo.py +++ b/TransformationSystem/Utilities/TransformationInfo.py @@ -152,10 +152,10 @@ def cleanOutputs(self, jobInfo): if not result['OK']: self.log.error("Failed to remove LFNs", result['Message']) raise RuntimeError("Failed to remove LFNs: %s" % result['Message']) - for lfn, err in result['Value']['Failed'].items(): - reason = str(err) - errorReasons[reason].append(lfn) - successfullyRemoved += len(list(result['Value']['Successful'])) + for lfn, err in result['Value']['Failed'].items(): + reason = str(err) + errorReasons[reason].append(lfn) + successfullyRemoved += len(result['Value']['Successful']) for reason, lfns in errorReasons.items(): self.log.error("Failed to remove %d files with error: %s" % (len(lfns), reason)) self.log.notice("Successfully removed %d files" % successfullyRemoved) From f9af4b86680fe5ee95e4a665c04026a7251647a8 Mon Sep 17 00:00:00 2001 From: fstagni Date: Fri, 12 Jun 2020 16:24:53 +0200 Subject: [PATCH 7/9] moved clean of very old transformations to the finalize --- .../Agent/TransformationCleaningAgent.py | 102 ++++++++++-------- 1 file changed, 59 insertions(+), 43 deletions(-) diff --git a/TransformationSystem/Agent/TransformationCleaningAgent.py b/TransformationSystem/Agent/TransformationCleaningAgent.py index f8fb6ae2931..390ffadd6a7 100644 --- a/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -124,49 +124,8 @@ def initialize(self): self.reqClient = ReqClient() # # file catalog client self.metadataClient = FileCatalogClient() - - # Only at (re)start: will clean ancient transformations (remnants) - # 1) get the transformation IDs of jobs that are older than 1 year - # 2) find the status of those transformations. Those "Cleaned" and "Archived" will be - # cleaned and archived (again) - res = JobMonitoringClient().getJobGroups(None, datetime.utcnow() - timedelta(days=365)) - if not res['OK']: - self.log.error("Failed to get job groups", res['Message']) - return res - transformationIDs = res['Value'] - if transformationIDs: - res = TransformationClient().getTransformations({'TransformationID': list(transformationIDs)}) - if not res['OK']: - self.log.error("Failed to get transformations", res['Message']) - return res - transformations = res['Value'] - toClean = [] - toArchive = [] - for transDict in transformations: - if transDict['Status'] == 'Cleaned': - toClean.append(transDict) - if transDict['Status'] == 'Archived': - toArchive.append(transDict) - - for transDict in toClean: - if self.shifterProxy: - self._executeClean(transDict) - else: - self.log.info("Cleaning transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" % - transDict) - executeWithUserProxy(self._executeClean)(transDict, - proxyUserDN=transDict['AuthorDN'], - proxyUserGroup=transDict['AuthorGroup']) - - for transDict in toArchive: - if self.shifterProxy: - self._executeArchive(transDict) - else: - self.log.info("Archiving files for transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" % - transDict) - executeWithUserProxy(self._executeArchive)(transDict, - proxyUserDN=transDict['AuthorDN'], - proxyUserGroup=transDict['AuthorGroup']) + # # job monitoring client + self.jobMonitoringClient = JobMonitoringClient() return S_OK() @@ -234,6 +193,63 @@ def execute(self): self.log.error("Could not get the transformations", res['Message']) return S_OK() + def finalize(self): + """ Only at finalization: will clean ancient transformations (remnants) + 1) get the transformation IDs of jobs that are older than 1 year + 2) find the status of those transformations. Those "Cleaned" and "Archived" will be + cleaned and archived (again) + """ + res = self.jobMonitoringClient.getJobGroups(None, datetime.utcnow() - timedelta(days=365)) + if not res['OK']: + self.log.error("Failed to get job groups", res['Message']) + return res + transformationIDs = res['Value'] + if transformationIDs: + res = self.transClient.getTransformations({'TransformationID': transformationIDs}) + if not res['OK']: + self.log.error("Failed to get transformations", res['Message']) + return res + transformations = res['Value'] + toClean = [] + toArchive = [] + for transDict in transformations: + if transDict['Status'] == 'Cleaned': + toClean.append(transDict) + if transDict['Status'] == 'Archived': + toArchive.append(transDict) + + for transDict in toClean: + if self.shifterProxy: + self._executeClean(transDict) + else: + self.log.info("Cleaning transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" % + transDict) + executeWithUserProxy(self._executeClean)(transDict, + proxyUserDN=transDict['AuthorDN'], + proxyUserGroup=transDict['AuthorGroup']) + + for transDict in toArchive: + if self.shifterProxy: + self._executeArchive(transDict) + else: + self.log.info("Archiving files for transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" % + transDict) + executeWithUserProxy(self._executeArchive)(transDict, + proxyUserDN=transDict['AuthorDN'], + proxyUserGroup=transDict['AuthorGroup']) + + # Remove JobIDs that were unknown to the TransformationSystem + jobGroupsToCheck = [str(transDict['TransformationID']).zfill(8) for transDict in toClean + toArchive] + res = self.jobMonitoringClient.getJobs({'JobGroup': jobGroupsToCheck}) + if not res['OK']: + return res + jobIDsToRemove = [int(jobID) for jobID in res['Value']] + res = self.__removeWMSTasks(jobIDsToRemove) + if not res['OK']: + return res + + return S_OK() + def _executeClean(self, transDict): """Clean transformation.""" # if transformation is of type `Replication` or `Removal`, there is nothing to clean. From 423ef412c67e8e4c68d56b5bf1fb7c0df3fc2c12 Mon Sep 17 00:00:00 2001 From: fstagni Date: Thu, 25 Jun 2020 15:27:45 +0200 Subject: [PATCH 8/9] added explanation --- .../Agent/TransformationCleaningAgent.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/TransformationSystem/Agent/TransformationCleaningAgent.py b/TransformationSystem/Agent/TransformationCleaningAgent.py index 390ffadd6a7..02e1bbd84ad 100644 --- a/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -198,6 +198,22 @@ def finalize(self): 1) get the transformation IDs of jobs that are older than 1 year 2) find the status of those transformations. Those "Cleaned" and "Archived" will be cleaned and archived (again) + + Why doing this here? Basically, it's a race: + 1) the production manager submits a transformation + 2) the TransformationAgent, and a bit later the WorkflowTaskAgent, put such transformation in their internal queue, + so eventually during their (long-ish) cycle they'll work on it. + 3) 1 minute after creating the transformation, the production manager cleans it (by hand, for whatever reason). + So, the status is changed to "Cleaning" + 4) the TransformationCleaningAgent cleans what has been created (maybe, nothing), + then sets the transformation status to "Cleaned" or "Archived" + 5) a bit later the TransformationAgent, and later the WorkflowTaskAgent, kick in, + creating tasks and jobs for a production that's effectively cleaned (but these 2 agents don't know yet). + + Of course, one could make one final check in TransformationAgent or WorkflowTaskAgent, + but these 2 agents are already doing a lot of stuff, and are pretty heavy. + So, we should just clean from time to time. + What I added here is done only when the agent finalize, and it's quite light-ish operation anyway. """ res = self.jobMonitoringClient.getJobGroups(None, datetime.utcnow() - timedelta(days=365)) if not res['OK']: From c6d239e51265445b389b2dfdc0806db6ffdd16ae Mon Sep 17 00:00:00 2001 From: fstagni Date: Thu, 25 Jun 2020 15:30:16 +0200 Subject: [PATCH 9/9] implement suggestions on spacing in lists --- .../Agent/TaskManagerAgentBase.py | 2 +- .../Agent/TransformationCleaningAgent.py | 64 ++++++++++--------- TransformationSystem/DB/TransformationDB.py | 2 +- .../Utilities/TransformationInfo.py | 8 +-- .../Test_Client_WMS.py | 2 +- 5 files changed, 40 insertions(+), 38 deletions(-) diff --git a/TransformationSystem/Agent/TaskManagerAgentBase.py b/TransformationSystem/Agent/TaskManagerAgentBase.py index 036fc0d4488..879a6774ddb 100644 --- a/TransformationSystem/Agent/TaskManagerAgentBase.py +++ b/TransformationSystem/Agent/TaskManagerAgentBase.py @@ -285,7 +285,7 @@ def _execute(self, threadID): # Queue was cleared, nothing to do continue try: - transID = list(transIDOPBody)[0] + transID = list(transIDOPBody)[0] operations = transIDOPBody[transID]['Operations'] if transID not in self.transInQueue: self._logWarn("Got a transf not in transInQueue...?", diff --git a/TransformationSystem/Agent/TransformationCleaningAgent.py b/TransformationSystem/Agent/TransformationCleaningAgent.py index 02e1bbd84ad..74d49bd24ae 100644 --- a/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -195,20 +195,22 @@ def execute(self): def finalize(self): """ Only at finalization: will clean ancient transformations (remnants) - 1) get the transformation IDs of jobs that are older than 1 year - 2) find the status of those transformations. Those "Cleaned" and "Archived" will be - cleaned and archived (again) + + 1) get the transformation IDs of jobs that are older than 1 year + 2) find the status of those transformations. Those "Cleaned" and "Archived" will be + cleaned and archived (again) Why doing this here? Basically, it's a race: + 1) the production manager submits a transformation 2) the TransformationAgent, and a bit later the WorkflowTaskAgent, put such transformation in their internal queue, - so eventually during their (long-ish) cycle they'll work on it. + so eventually during their (long-ish) cycle they'll work on it. 3) 1 minute after creating the transformation, the production manager cleans it (by hand, for whatever reason). - So, the status is changed to "Cleaning" + So, the status is changed to "Cleaning" 4) the TransformationCleaningAgent cleans what has been created (maybe, nothing), - then sets the transformation status to "Cleaned" or "Archived" + then sets the transformation status to "Cleaned" or "Archived" 5) a bit later the TransformationAgent, and later the WorkflowTaskAgent, kick in, - creating tasks and jobs for a production that's effectively cleaned (but these 2 agents don't know yet). + creating tasks and jobs for a production that's effectively cleaned (but these 2 agents don't know yet). Of course, one could make one final check in TransformationAgent or WorkflowTaskAgent, but these 2 agents are already doing a lot of stuff, and are pretty heavy. @@ -223,46 +225,46 @@ def finalize(self): if transformationIDs: res = self.transClient.getTransformations({'TransformationID': transformationIDs}) if not res['OK']: - self.log.error("Failed to get transformations", res['Message']) - return res + self.log.error("Failed to get transformations", res['Message']) + return res transformations = res['Value'] toClean = [] toArchive = [] for transDict in transformations: - if transDict['Status'] == 'Cleaned': - toClean.append(transDict) - if transDict['Status'] == 'Archived': - toArchive.append(transDict) + if transDict['Status'] == 'Cleaned': + toClean.append(transDict) + if transDict['Status'] == 'Archived': + toArchive.append(transDict) for transDict in toClean: - if self.shifterProxy: - self._executeClean(transDict) - else: - self.log.info("Cleaning transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" % - transDict) - executeWithUserProxy(self._executeClean)(transDict, - proxyUserDN=transDict['AuthorDN'], - proxyUserGroup=transDict['AuthorGroup']) + if self.shifterProxy: + self._executeClean(transDict) + else: + self.log.info("Cleaning transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" % + transDict) + executeWithUserProxy(self._executeClean)(transDict, + proxyUserDN=transDict['AuthorDN'], + proxyUserGroup=transDict['AuthorGroup']) for transDict in toArchive: - if self.shifterProxy: - self._executeArchive(transDict) - else: - self.log.info("Archiving files for transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" % - transDict) - executeWithUserProxy(self._executeArchive)(transDict, - proxyUserDN=transDict['AuthorDN'], - proxyUserGroup=transDict['AuthorGroup']) + if self.shifterProxy: + self._executeArchive(transDict) + else: + self.log.info("Archiving files for transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" % + transDict) + executeWithUserProxy(self._executeArchive)(transDict, + proxyUserDN=transDict['AuthorDN'], + proxyUserGroup=transDict['AuthorGroup']) # Remove JobIDs that were unknown to the TransformationSystem jobGroupsToCheck = [str(transDict['TransformationID']).zfill(8) for transDict in toClean + toArchive] res = self.jobMonitoringClient.getJobs({'JobGroup': jobGroupsToCheck}) if not res['OK']: - return res + return res jobIDsToRemove = [int(jobID) for jobID in res['Value']] res = self.__removeWMSTasks(jobIDsToRemove) if not res['OK']: - return res + return res return S_OK() diff --git a/TransformationSystem/DB/TransformationDB.py b/TransformationSystem/DB/TransformationDB.py index 3d72e58f0b6..1b1fd24ceb1 100755 --- a/TransformationSystem/DB/TransformationDB.py +++ b/TransformationSystem/DB/TransformationDB.py @@ -550,7 +550,7 @@ def getTransformationFiles(self, condDict=None, older=None, newer=None, timeStam if not res['OK']: return res originalFileIDs = res['Value'][0] - condDict['FileID'] = list(originalFileIDs) + condDict['FileID'] = list(originalFileIDs) for val in condDict.itervalues(): if not val: diff --git a/TransformationSystem/Utilities/TransformationInfo.py b/TransformationSystem/Utilities/TransformationInfo.py index c47da7dce90..a6a0bc97938 100644 --- a/TransformationSystem/Utilities/TransformationInfo.py +++ b/TransformationSystem/Utilities/TransformationInfo.py @@ -152,10 +152,10 @@ def cleanOutputs(self, jobInfo): if not result['OK']: self.log.error("Failed to remove LFNs", result['Message']) raise RuntimeError("Failed to remove LFNs: %s" % result['Message']) - for lfn, err in result['Value']['Failed'].items(): - reason = str(err) - errorReasons[reason].append(lfn) - successfullyRemoved += len(result['Value']['Successful']) + for lfn, err in result['Value']['Failed'].items(): + reason = str(err) + errorReasons[reason].append(lfn) + successfullyRemoved += len(result['Value']['Successful']) for reason, lfns in errorReasons.items(): self.log.error("Failed to remove %d files with error: %s" % (len(lfns), reason)) self.log.notice("Successfully removed %d files" % successfullyRemoved) diff --git a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py index 30043850e8e..780db269479 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py +++ b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py @@ -391,7 +391,7 @@ def test_JobStateUpdateAndJobMonitoringMultuple(self): try: self.assertTrue( res['Value'].get('Received') + - res['Value'].get('Waiting') >= int(len(lfnss) * len(types))) + res['Value'].get('Waiting') >= int(len(lfnss) * len(types))) except TypeError: pass res = jobMonitor.getJobsSummary(jobIDs)