Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ConfigurationSystem/Client/Helpers/Registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def getUsernameForDN(dn, usersList=None):

:return: S_OK(str)/S_ERROR()
"""
dn = dn.strip()
if not usersList:
result = gConfig.getSections("%s/Users" % gBaseRegistrySection)
if not result['OK']:
Expand Down Expand Up @@ -63,6 +64,7 @@ def getGroupsForDN(dn):

:return: S_OK(list)/S_ERROR() -- contain list of groups
"""
dn = dn.strip()
result = getUsernameForDN(dn)
if not result['OK']:
return result
Expand Down Expand Up @@ -128,6 +130,7 @@ def getHostnameForDN(dn):

:return: S_OK()/S_ERROR()
"""
dn = dn.strip()
result = gConfig.getSections("%s/Hosts" % gBaseRegistrySection)
if not result['OK']:
return result
Expand All @@ -153,6 +156,7 @@ def findDefaultGroupForDN(dn):

:return: S_OK()/S_ERROR()
"""
dn = dn.strip()
result = getUsernameForDN(dn)
if not result['OK']:
return result
Expand Down
2 changes: 1 addition & 1 deletion TransformationSystem/Agent/MCExtensionAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def extendTransformation(self, transID, maxTasks):
else:
statusDict = res['Value']
gLogger.verbose("Current task count for transformation %d" % transID)
for status in sorted(statusDict.keys()):
for status in sorted(statusDict):
statusCount = statusDict[status]
gLogger.verbose("%s : %s" % (status.ljust(20), str(statusCount).rjust(8)))
# Determine the number of tasks to be created
Expand Down
10 changes: 5 additions & 5 deletions TransformationSystem/Agent/TaskManagerAgentBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def _execute(self, threadID):
# Queue was cleared, nothing to do
continue
try:
transID = transIDOPBody.keys()[0]
transID = list(transIDOPBody)[0]
operations = transIDOPBody[transID]['Operations']
if transID not in self.transInQueue:
self._logWarn("Got a transf not in transInQueue...?",
Expand Down Expand Up @@ -327,7 +327,7 @@ def _execute(self, threadID):
def updateTaskStatus(self, transIDOPBody, clients):
""" Updates the task status
"""
transID = transIDOPBody.keys()[0]
transID = list(transIDOPBody)[0]
method = 'updateTaskStatus'

# Get the tasks which are in an UPDATE state, i.e. job statuses + request-specific statuses
Expand Down Expand Up @@ -410,7 +410,7 @@ def updateTaskStatus(self, transIDOPBody, clients):
def updateFileStatus(self, transIDOPBody, clients):
""" Update the files status
"""
transID = transIDOPBody.keys()[0]
transID = list(transIDOPBody)[0]
method = 'updateFileStatus'

timeStamp = str(datetime.datetime.utcnow() - datetime.timedelta(minutes=10))
Expand Down Expand Up @@ -480,7 +480,7 @@ def updateFileStatus(self, transIDOPBody, clients):
def checkReservedTasks(self, transIDOPBody, clients):
""" Checking Reserved tasks
"""
transID = transIDOPBody.keys()[0]
transID = list(transIDOPBody)[0]
method = 'checkReservedTasks'

# Select the tasks which have been in Reserved status for more than 1 hour for selected transformations
Expand Down Expand Up @@ -546,7 +546,7 @@ def submitTasks(self, transIDOPBody, clients):

:return: S_OK/S_ERROR
"""
transID = transIDOPBody.keys()[0]
transID = list(transIDOPBody)[0]
transBody = transIDOPBody[transID]['Body']
owner = transIDOPBody[transID]['Owner']
ownerGroup = transIDOPBody[transID]['OwnerGroup']
Expand Down
94 changes: 86 additions & 8 deletions TransformationSystem/Agent/TransformationCleaningAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@
from DIRAC.Core.Utilities.List import breakListIntoChunks
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.Core.Utilities.DErrno import cmpError
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient

# # agent's name
AGENT_NAME = 'Transformation/TransformationCleaningAgent'
Expand Down Expand Up @@ -83,7 +84,7 @@ def __init__(self, *args, **kwargs):
def initialize(self):
""" agent initialisation

reading and setting confing opts
reading and setting config opts

:param self: self reference
"""
Expand Down Expand Up @@ -123,6 +124,8 @@ def initialize(self):
self.reqClient = ReqClient()
# # file catalog client
self.metadataClient = FileCatalogClient()
# # job monitoring client
self.jobMonitoringClient = JobMonitoringClient()

return S_OK()

Expand Down Expand Up @@ -190,6 +193,81 @@ def execute(self):
self.log.error("Could not get the transformations", res['Message'])
return S_OK()

def finalize(self):
""" Only at finalization: will clean ancient transformations (remnants)

1) get the transformation IDs of jobs that are older than 1 year
Comment thread
fstagni marked this conversation as resolved.
2) find the status of those transformations. Those "Cleaned" and "Archived" will be
cleaned and archived (again)

Why doing this here? Basically, it's a race:

1) the production manager submits a transformation
2) the TransformationAgent, and a bit later the WorkflowTaskAgent, put such transformation in their internal queue,
so eventually during their (long-ish) cycle they'll work on it.
3) 1 minute after creating the transformation, the production manager cleans it (by hand, for whatever reason).
So, the status is changed to "Cleaning"
4) the TransformationCleaningAgent cleans what has been created (maybe, nothing),
then sets the transformation status to "Cleaned" or "Archived"
5) a bit later the TransformationAgent, and later the WorkflowTaskAgent, kick in,
creating tasks and jobs for a production that's effectively cleaned (but these 2 agents don't know yet).

Of course, one could make one final check in TransformationAgent or WorkflowTaskAgent,
but these 2 agents are already doing a lot of stuff, and are pretty heavy.
So, we should just clean from time to time.
What I added here is done only when the agent finalize, and it's quite light-ish operation anyway.
"""
res = self.jobMonitoringClient.getJobGroups(None, datetime.utcnow() - timedelta(days=365))
if not res['OK']:
self.log.error("Failed to get job groups", res['Message'])
return res
transformationIDs = res['Value']
if transformationIDs:
res = self.transClient.getTransformations({'TransformationID': transformationIDs})
if not res['OK']:
self.log.error("Failed to get transformations", res['Message'])
return res
transformations = res['Value']
toClean = []
toArchive = []
for transDict in transformations:
if transDict['Status'] == 'Cleaned':
toClean.append(transDict)
if transDict['Status'] == 'Archived':
toArchive.append(transDict)

for transDict in toClean:
if self.shifterProxy:
self._executeClean(transDict)
else:
self.log.info("Cleaning transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" %
transDict)
executeWithUserProxy(self._executeClean)(transDict,
proxyUserDN=transDict['AuthorDN'],
proxyUserGroup=transDict['AuthorGroup'])

for transDict in toArchive:
if self.shifterProxy:
self._executeArchive(transDict)
else:
self.log.info("Archiving files for transformation %(TransformationID)s with %(AuthorDN)s, %(AuthorGroup)s" %
transDict)
executeWithUserProxy(self._executeArchive)(transDict,
proxyUserDN=transDict['AuthorDN'],
proxyUserGroup=transDict['AuthorGroup'])

# Remove JobIDs that were unknown to the TransformationSystem
jobGroupsToCheck = [str(transDict['TransformationID']).zfill(8) for transDict in toClean + toArchive]
res = self.jobMonitoringClient.getJobs({'JobGroup': jobGroupsToCheck})
if not res['OK']:
return res
jobIDsToRemove = [int(jobID) for jobID in res['Value']]
res = self.__removeWMSTasks(jobIDsToRemove)
if not res['OK']:
return res

return S_OK()

def _executeClean(self, transDict):
"""Clean transformation."""
# if transformation is of type `Replication` or `Removal`, there is nothing to clean.
Expand Down Expand Up @@ -348,7 +426,7 @@ def __getCatalogDirectoryContents(self, directories):
activeDirs.extend(dirContents['SubDirs'])
allFiles.update(dirContents['Files'])
self.log.info("Found %d files" % len(allFiles))
return S_OK(allFiles.keys())
return S_OK(list(allFiles))

def cleanTransformationLogFiles(self, directory):
""" clean up transformation logs from directory :directory:
Expand Down
6 changes: 3 additions & 3 deletions TransformationSystem/Agent/TransformationPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,19 @@ def _ByShare(self, shareType='CPU'):
return res
cpuShares = res['Value']
self.util.logInfo("Obtained the following target shares (%):")
for site in sorted(cpuShares.keys()):
for site in sorted(cpuShares):
self.util.logInfo("%s: %.1f" % (site.ljust(15), cpuShares[site]))

# Get the existing destinations from the transformationDB
res = self.util.getExistingCounters(requestedSites=cpuShares.keys())
res = self.util.getExistingCounters(requestedSites=list(cpuShares))
if not res['OK']:
self.util.logError("Failed to get existing file share", res['Message'])
return res
existingCount = res['Value']
if existingCount:
self.util.logInfo("Existing site utilization (%):")
normalisedExistingCount = self.util._normaliseShares(existingCount.copy()) # pylint: disable=protected-access
for se in sorted(normalisedExistingCount.keys()):
for se in sorted(normalisedExistingCount):
self.util.logInfo("%s: %.1f" % (se.ljust(15), normalisedExistingCount[se]))

# Group the input files by their existing replicas
Expand Down
2 changes: 1 addition & 1 deletion TransformationSystem/Agent/ValidateOutputDataAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def checkTransformationIntegrity(self, transID):
if res['Value']['Failed']:
return S_ERROR("Failed to determine the existance of directories")
directoryExists = res['Value']['Successful']
for directory in sorted(directoryExists.keys()):
for directory in sorted(directoryExists):
if not directoryExists[directory]:
continue
iRes = self.consistencyInspector.catalogDirectoryToSE(directory)
Expand Down
5 changes: 4 additions & 1 deletion TransformationSystem/Client/TaskManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def getSubmittedFileStatus(self, fileDicts):
if transID is None:
return S_OK({})

res = self.transClient.getTransformationTasks({'TransformationID': transID, 'TaskID': taskFiles.keys()})
res = self.transClient.getTransformationTasks({'TransformationID': transID, 'TaskID': list(taskFiles)})
if not res['OK']:
return res
requestFiles = {}
Expand Down Expand Up @@ -668,17 +668,20 @@ def __prepareTasks(self, transBody, taskDict, owner, ownerGroup, ownerDN):

method = '__prepareTasks'
startTime = time.time()

oJobTemplate = self.jobClass(transBody)
oJobTemplate.setOwner(owner)
oJobTemplate.setOwnerGroup(ownerGroup)
oJobTemplate.setOwnerDN(ownerDN)

try:
site = oJobTemplate.workflow.findParameter('Site').getValue()
except AttributeError:
site = None
jobType = oJobTemplate.workflow.findParameter('JobType').getValue()
templateOK = False
getOutputDataTiming = 0.

for taskID, paramsDict in taskDict.iteritems():
# Create a job for each task and add it to the taskDict
if not templateOK:
Expand Down
2 changes: 1 addition & 1 deletion TransformationSystem/Client/Transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def __getattr__(self, name):

def __getParam(self):
if self.item_called == 'Available':
return S_OK(self.paramTypes.keys())
return S_OK(list(self.paramTypes))
if self.item_called == 'Parameters':
return S_OK(self.paramValues)
if self.item_called in self.paramValues:
Expand Down
26 changes: 13 additions & 13 deletions TransformationSystem/Client/TransformationCLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,11 +541,11 @@ def do_replicas(self, args):
if not res['OK']:
print("failed to get any replica information: %s" % res['Message'])
return
for lfn in sorted(res['Value']['Failed'].keys()):
for lfn in sorted(res['Value']['Failed']):
error = res['Value']['Failed'][lfn]
print("failed to get replica information for %s: %s" % (lfn, error))
for lfn in sorted(res['Value']['Successful'].keys()):
ses = sorted(res['Value']['Successful'][lfn].keys())
for lfn in sorted(res['Value']['Successful']):
ses = sorted(res['Value']['Successful'][lfn])
outStr = "%s :" % lfn.ljust(100)
for se in ses:
outStr = "%s %s" % (outStr, se.ljust(15))
Expand All @@ -568,10 +568,10 @@ def do_addFile(self, args):
if not res['OK']:
print("failed to add any files: %s" % res['Message'])
return
for lfn in sorted(res['Value']['Failed'].keys()):
for lfn in sorted(res['Value']['Failed']):
error = res['Value']['Failed'][lfn]
print("failed to add %s: %s" % (lfn, error))
for lfn in sorted(res['Value']['Successful'].keys()):
for lfn in sorted(res['Value']['Successful']):
print("added %s" % lfn)

def do_removeFile(self, args):
Expand All @@ -587,10 +587,10 @@ def do_removeFile(self, args):
if not res['OK']:
print("failed to remove any files: %s" % res['Message'])
return
for lfn in sorted(res['Value']['Failed'].keys()):
for lfn in sorted(res['Value']['Failed']):
error = res['Value']['Failed'][lfn]
print("failed to remove %s: %s" % (lfn, error))
for lfn in sorted(res['Value']['Successful'].keys()):
for lfn in sorted(res['Value']['Successful']):
print("removed %s" % lfn)

def do_addReplica(self, args):
Expand All @@ -610,10 +610,10 @@ def do_addReplica(self, args):
if not res['OK']:
print("failed to add replica: %s" % res['Message'])
return
for lfn in sorted(res['Value']['Failed'].keys()):
for lfn in sorted(res['Value']['Failed']):
error = res['Value']['Failed'][lfn]
print("failed to add replica: %s" % (error))
for lfn in sorted(res['Value']['Successful'].keys()):
for lfn in sorted(res['Value']['Successful']):
print("added %s" % lfn)

def do_removeReplica(self, args):
Expand All @@ -633,10 +633,10 @@ def do_removeReplica(self, args):
if not res['OK']:
print("failed to remove replica: %s" % res['Message'])
return
for lfn in sorted(res['Value']['Failed'].keys()):
for lfn in sorted(res['Value']['Failed']):
error = res['Value']['Failed'][lfn]
print("failed to remove replica: %s" % (error))
for lfn in sorted(res['Value']['Successful'].keys()):
for lfn in sorted(res['Value']['Successful']):
print("removed %s" % lfn)

def do_setReplicaStatus(self, args):
Expand All @@ -663,10 +663,10 @@ def do_setReplicaStatus(self, args):
if not res['OK']:
print("failed to set replica status: %s" % res['Message'])
return
for lfn in sorted(res['Value']['Failed'].keys()):
for lfn in sorted(res['Value']['Failed']):
error = res['Value']['Failed'][lfn]
print("failed to set replica status: %s" % (error))
for lfn in sorted(res['Value']['Successful'].keys()):
for lfn in sorted(res['Value']['Successful']):
print("updated replica status %s" % lfn)


Expand Down
Loading