Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a1fcc42
Added macro on top
ubeda Feb 27, 2012
1c53d23
pyLint changes
ubeda Feb 27, 2012
b21dc4d
pyLint changes
ubeda Feb 27, 2012
1ccbf4a
macro heads on top
ubeda Feb 27, 2012
a0654a0
pyLint modifications
ubeda Feb 27, 2012
8f146c5
pyLint changes ( cosmetic )
ubeda Feb 27, 2012
4ac9492
pyLint changes ( cosmetic )
ubeda Feb 27, 2012
ee7001e
pyLint changes ( cosmetic )
ubeda Feb 27, 2012
fb52f9c
pyLint changes ( cosmetic )
ubeda Feb 27, 2012
4821428
pyLint changes ( cosmetic )
ubeda Feb 27, 2012
245bbcc
pyLint changes ( docstring )
ubeda Feb 27, 2012
bc5c350
pyLint changes ( cosmetic )
ubeda Feb 27, 2012
4552ee6
pyLint changes ( variable renaming )
ubeda Feb 27, 2012
091dc8d
pyLint changes ( disable message )
ubeda Feb 27, 2012
80e6ae1
pyLint changes ( locally disabling )
ubeda Feb 27, 2012
49c9487
Removed unused code
ubeda Feb 27, 2012
189a60c
pyLint changes ( cosmetic )
ubeda Feb 27, 2012
c6c4a44
pyLint changes ( meta = None instead of {} )
ubeda Feb 27, 2012
b240650
pyLint changes ( disabled unused )
ubeda Feb 27, 2012
48d633f
test
ubeda Feb 27, 2012
c30a45b
test
ubeda Feb 27, 2012
286406c
pyLint changes ( variable rename )
ubeda Feb 27, 2012
2e0ea98
pyLint changes ( added docstring )
ubeda Feb 27, 2012
d1cf5ba
pyLint changes ( variable renaming, docstrings )
ubeda Feb 27, 2012
886b35a
pyLint changes ( disable initialized members in initialize )
ubeda Feb 27, 2012
a7da486
pyLint changes ( disabled R0904 )
ubeda Feb 27, 2012
37fcd2e
pyLint changes ( add disable code explanation )
ubeda Feb 27, 2012
b22f91c
pyLint changes ( unused variables, docstring, renaming )
ubeda Feb 27, 2012
92462aa
pyLint changes ( variable renaming ) and typos fixes
ubeda Feb 27, 2012
5b9e218
Merge remote branch 'upstream/integration' into pyLint
ubeda Mar 6, 2012
1f3c046
BUGIX: _checkFloat checks INTEGERS, not datetimes.
ubeda Mar 6, 2012
afb8820
FIX: checked 'OK' key on addOrModify
ubeda Mar 7, 2012
493a6f7
Merge branch 'monkeyINTFix2' into pyLint2
ubeda Mar 8, 2012
543ccfd
FIX: Wrong order of bool sentence
ubeda Mar 8, 2012
c6cb38c
FIX: Moved log level to debug
ubeda Mar 8, 2012
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions ResourceStatusSystem/Agent/CacheCleanerAgent.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
################################################################################
# $HeadURL$
################################################################################
__RCSID__ = "$Id$"
AGENT_NAME = 'ResourceStatus/CleanerAgent'
''' CacheCleanerAgent

This agent cleans the history tables, and the cache ones if entries older
than a certan period.

from datetime import datetime,timedelta
'''

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Base.AgentModule import AgentModule
from datetime import datetime, timedelta

from DIRAC.ResourceStatusSystem import ValidRes
from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.ResourceStatusSystem import ValidRes
from DIRAC.ResourceStatusSystem.Client.ResourceStatusClient import ResourceStatusClient
from DIRAC.ResourceStatusSystem.Client.ResourceManagementClient import ResourceManagementClient

__RCSID__ = '$Id: $'
AGENT_NAME = 'ResourceStatus/CleanerAgent'

class CacheCleanerAgent( AgentModule ):
'''
The CacheCleanerAgent tidies up the ResourceStatusDB, namely:
Expand All @@ -30,13 +34,19 @@ class CacheCleanerAgent( AgentModule ):
If you want to know more about the CacheCleanerAgent, scroll down to the end of the
file.
'''

# Too many public methods
# pylint: disable-msg=R0904

def initialize( self ):

# Attribute defined outside __init__
# pylint: disable-msg=W0201

try:

self.rsClient = ResourceStatusClient()
self.rmClient = ResourceManagementClient()
self.rsClient = ResourceStatusClient()
self.rmClient = ResourceManagementClient()
self.historyTables = [ '%sHistory' % x for x in ValidRes ]

return S_OK()
Expand All @@ -57,12 +67,12 @@ def execute( self ):
now = datetime.utcnow().replace( microsecond = 0, second = 0 )
sixMonthsAgo = now - timedelta( days = 180 )

for g in ValidRes:
for granularity in ValidRes:
#deleter = getattr( self.rsClient, 'delete%sHistory' % g )

kwargs = { 'meta' : { 'minor' : { 'DateEnd' : sixMonthsAgo } } }
self.log.info( 'Deleting %sHistory older than %s' % ( g, sixMonthsAgo ) )
res = self.rsClient.deleteElementHistory( g, **kwargs )
self.log.info( 'Deleting %sHistory older than %s' % ( granularity, sixMonthsAgo ) )
res = self.rsClient.deleteElementHistory( granularity, **kwargs )
if not res[ 'OK' ]:
self.log.error( res[ 'Message' ] )

Expand Down
51 changes: 30 additions & 21 deletions ResourceStatusSystem/Agent/CacheFeederAgent.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,50 @@
################################################################################
# $HeadURL: $
################################################################################
__RCSID__ = "$Id: $"
AGENT_NAME = 'ResourceStatus/CacheFeederAgent'
''' CacheFeederAgent

This agent feeds the Cache tables with the outputs of the cache commands.

'''

from datetime import datetime

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Base.AgentModule import AgentModule

from DIRAC.ResourceStatusSystem.Client.ResourceManagementClient import ResourceManagementClient
from DIRAC.ResourceStatusSystem.Command.CommandCaller import CommandCaller
from DIRAC.ResourceStatusSystem.Command.ClientsInvoker import ClientsInvoker
from DIRAC.ResourceStatusSystem.Command.knownAPIs import initAPIs

__RCSID__ = '$Id: $'
AGENT_NAME = 'ResourceStatus/CacheFeederAgent'

class CacheFeederAgent( AgentModule ):
'''
The CacheFeederAgent feeds the cache tables for the client and the accounting.
It runs periodically a set of commands, and stores it's results on the
tables.
'''

# Too many public methods
# pylint: disable-msg=R0904

def initialize( self ):

# Attribute defined outside __init__
# pylint: disable-msg=W0201

try:

self.rmClient = ResourceManagementClient()
self.clientsInvoker = ClientsInvoker()

commandsList_ClientsCache = [
commandsListClientsCache = [
( 'ClientsCache_Command', 'JobsEffSimpleEveryOne_Command' ),
( 'ClientsCache_Command', 'PilotsEffSimpleEverySites_Command' ),
( 'ClientsCache_Command', 'DTEverySites_Command' ),
( 'ClientsCache_Command', 'DTEveryResources_Command' )
]

commandsList_AccountingCache = [
commandsListAccountingCache = [
( 'AccountingCache_Command', 'TransferQualityByDestSplitted_Command', ( 2, ), 'Always' ),
( 'AccountingCache_Command', 'FailedTransfersBySourceSplitted_Command', ( 2, ), 'Always' ),
( 'AccountingCache_Command', 'TransferQualityByDestSplittedSite_Command', ( 24, ), 'Hourly' ),
Expand All @@ -51,33 +60,33 @@ def initialize( self ):
( 'AccountingCache_Command', 'RunningJobsBySiteSplitted_Command', ( 8760, ), 'Daily' ),
]

self.commandObjectsList_ClientsCache = []
self.commandObjectsList_AccountingCache = []
self.commandObjectsListClientsCache = []
self.commandObjectsListAccountingCache = []

cc = CommandCaller()

# We know beforehand which APIs are we going to need, so we initialize them
# first, making everything faster.
APIs = [ 'ResourceStatusClient', 'WMSAdministrator', 'ReportGenerator',
'JobsClient', 'PilotsClient', 'GOCDBClient', 'ReportsClient' ]
APIs = initAPIs( APIs, {} )
knownAPIs = [ 'ResourceStatusClient', 'WMSAdministrator', 'ReportGenerator',
'JobsClient', 'PilotsClient', 'GOCDBClient', 'ReportsClient' ]
knownAPIs = initAPIs( knownAPIs, {} )

for command in commandsList_ClientsCache:
for command in commandsListClientsCache:

cObj = cc.setCommandObject( command )
for apiName, apiInstance in APIs.items():
for apiName, apiInstance in knownAPIs.items():
cc.setAPI( cObj, apiName, apiInstance )

self.commandObjectsList_ClientsCache.append( ( command, cObj ) )
self.commandObjectsListClientsCache.append( ( command, cObj ) )

for command in commandsList_AccountingCache:
for command in commandsListAccountingCache:

cObj = cc.setCommandObject( command )
for apiName, apiInstance in APIs.items():
for apiName, apiInstance in knownAPIs.items():
cc.setAPI( cObj, apiName, apiInstance )
cArgs = command[ 2 ]

self.commandObjectsList_AccountingCache.append( ( command, cObj, cArgs ) )
self.commandObjectsListAccountingCache.append( ( command, cObj, cArgs ) )

return S_OK()

Expand All @@ -92,10 +101,10 @@ def execute( self ):

try:

for co in self.commandObjectsList_ClientsCache:
for co in self.commandObjectsListClientsCache:

commandName = co[0][1].split( '_' )[0]
gLogger.info( 'Executed %s' % commandName )
self.log.info( 'Executed %s' % commandName )
try:
self.clientsInvoker.setCommand( co[1] )
res = self.clientsInvoker.doCommand()['Result']
Expand Down Expand Up @@ -140,7 +149,7 @@ def execute( self ):

now = datetime.utcnow().replace( microsecond = 0 )

for co in self.commandObjectsList_AccountingCache:
for co in self.commandObjectsListAccountingCache:

if co[0][3] == 'Hourly':
if now.minute >= 10:
Expand Down
66 changes: 38 additions & 28 deletions ResourceStatusSystem/Agent/RSInspectorAgent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
################################################################################
# $HeadURL: $
################################################################################
__RCSID__ = "$Id: $"
AGENT_NAME = 'ResourceStatus/RSInspectorAgent'
''' RSInspectorAgent

This agent inspect Resources, and evaluates policies that apply.

'''

import Queue, time

Expand All @@ -16,6 +17,9 @@
from DIRAC.ResourceStatusSystem.Utilities.Utils import where
from DIRAC.ResourceStatusSystem.Utilities import CS

__RCSID__ = '$Id: $'
AGENT_NAME = 'ResourceStatus/RSInspectorAgent'

class RSInspectorAgent( AgentModule ):
"""
The RSInspector agent ( ResourceInspectorAgent ) is one of the four
Expand All @@ -28,13 +32,19 @@ class RSInspectorAgent( AgentModule ):
end of the file.
"""

# Too many public methods
# pylint: disable-msg=R0904

def initialize( self ):

# Attribute defined outside __init__
# pylint: disable-msg=W0201

try:
self.rsClient = ResourceStatusClient()
self.ResourcesFreqs = CS.getTypedDictRootedAt( 'CheckingFreqs/ResourcesFreqs' )
self.ResourcesToBeChecked = Queue.Queue()
self.ResourceNamesInCheck = []
self.resourcesFreqs = CS.getTypedDictRootedAt( 'CheckingFreqs/ResourcesFreqs' )
self.resourcesToBeChecked = Queue.Queue()
self.resourceNamesInCheck = []

self.maxNumberOfThreads = self.am_getOption( 'maxThreadsInPool', 1 )
self.threadPool = ThreadPool( self.maxNumberOfThreads,
Expand All @@ -53,9 +63,6 @@ def initialize( self ):
self.log.exception( errorStr )
return S_ERROR( errorStr )

################################################################################
################################################################################

def execute( self ):

try:
Expand All @@ -66,7 +73,7 @@ def execute( self ):
'TokenOwner' ]
kwargs[ 'tokenOwner' ] = 'RS_SVC'

resQuery = self.rsClient.getStuffToCheck( 'Resource', self.ResourcesFreqs, **kwargs )
resQuery = self.rsClient.getStuffToCheck( 'Resource', self.resourcesFreqs, **kwargs )
if not resQuery[ 'OK' ]:
self.log.error( resQuery[ 'Message' ] )
return resQuery
Expand All @@ -76,40 +83,43 @@ def execute( self ):

for resourceTuple in resQuery:

if ( resourceTuple[ 0 ], resourceTuple[ 1 ] ) in self.ResourceNamesInCheck:
if ( resourceTuple[ 0 ], resourceTuple[ 1 ] ) in self.resourceNamesInCheck:
self.log.info( '%s(%s) discarded, already on the queue' % ( resourceTuple[ 0 ], resourceTuple[ 1 ] ) )
continue

resourceL = [ 'Resource' ] + resourceTuple

self.ResourceNamesInCheck.insert( 0, ( resourceTuple[ 0 ], resourceTuple[ 1 ] ) )
self.ResourcesToBeChecked.put( resourceL )
self.resourceNamesInCheck.insert( 0, ( resourceTuple[ 0 ], resourceTuple[ 1 ] ) )
self.resourcesToBeChecked.put( resourceL )

return S_OK()

except Exception, x:
errorStr = where( self, self.execute )
self.log.exception( errorStr,lException=x )
self.log.exception( errorStr, lException = x )
return S_ERROR( errorStr )

################################################################################
################################################################################

def finalize( self ):
if self.ResourceNamesInCheck:
'''
Method executed at the end of the last cycle. It waits until the queue
is empty.
'''
if self.resourceNamesInCheck:
_msg = "Wait for queue to get empty before terminating the agent (%d tasks)"
_msg = _msg % len( self.ResourceNamesInCheck )
_msg = _msg % len( self.resourceNamesInCheck )
self.log.info( _msg )
while self.ResourceNamesInCheck:
while self.resourceNamesInCheck:
time.sleep( 2 )
self.log.info( "Queue is empty, terminating the agent..." )
return S_OK()

################################################################################
################################################################################

def _executeCheck( self, _arg ):

'''
Method executed by the threads in the pool. Picks one element from the
common queue, and enforces policies on that element.
'''
# Init the APIs beforehand, and reuse them.
__APIs__ = [ 'ResourceStatusClient', 'ResourceManagementClient' ]
clients = knownAPIs.initAPIs( __APIs__, {} )
Expand All @@ -118,7 +128,7 @@ def _executeCheck( self, _arg ):

while True:

toBeChecked = self.ResourcesToBeChecked.get()
toBeChecked = self.resourcesToBeChecked.get()

pepDict = { 'granularity' : toBeChecked[ 0 ],
'name' : toBeChecked[ 1 ],
Expand All @@ -131,24 +141,24 @@ def _executeCheck( self, _arg ):

try:

gLogger.info( "Checking Resource %s, with type/status: %s/%s" % \
self.log.info( "Checking Resource %s, with type/status: %s/%s" % \
( pepDict['name'], pepDict['statusType'], pepDict['status'] ) )

pepRes = pep.enforce( **pepDict )
if pepRes.has_key( 'PolicyCombinedResult' ) and pepRes[ 'PolicyCombinedResult' ].has_key( 'Status' ):
pepStatus = pepRes[ 'PolicyCombinedResult' ][ 'Status' ]
if pepStatus != pepDict[ 'status' ]:
gLogger.info( 'Updated Site %s (%s) from %s to %s' %
self.log.info( 'Updated Site %s (%s) from %s to %s' %
( pepDict['name'], pepDict['statusType'], pepDict['status'], pepStatus ))

# remove from InCheck list
self.ResourceNamesInCheck.remove( ( pepDict[ 'name' ], pepDict[ 'statusType' ] ) )
self.resourceNamesInCheck.remove( ( pepDict[ 'name' ], pepDict[ 'statusType' ] ) )

except Exception:
self.log.exception( "RSInspector._executeCheck Checking Resource %s, with type/status: %s/%s" % \
( pepDict['name'], pepDict['statusType'], pepDict['status'] ) )
try:
self.ResourceNamesInCheck.remove( ( pepDict[ 'name' ], pepDict[ 'statusType' ] ) )
self.resourceNamesInCheck.remove( ( pepDict[ 'name' ], pepDict[ 'statusType' ] ) )
except IndexError:
pass

Expand Down
Loading