Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
111b871
added spaceTokenUsage scripts
fstagni Dec 7, 2017
ede4893
moved to functions
fstagni Dec 7, 2017
30c7513
scaffolding for getOccupancy calls
fstagni Dec 8, 2017
23c044d
few fixes
fstagni Dec 8, 2017
7146c26
style
fstagni Dec 8, 2017
a2fdd92
re-migrated
fstagni Dec 8, 2017
884b9f9
just style
fstagni Dec 11, 2017
3acc887
using StorageElement.getOccupancy()
fstagni Dec 11, 2017
5965036
just style
fstagni Dec 11, 2017
bd97a88
bug fix
fstagni Dec 11, 2017
0897a84
Merge branch 'integration' into spaceTokenUsage
fstagni Dec 11, 2017
3528faf
Merge branch 'integration' into spaceTokenUsage
fstagni Dec 12, 2017
acb03a6
Merge branch 'integration' into spaceTokenUsage
fstagni Dec 12, 2017
6ea8435
Merge remote-tracking branch 'origin/spaceTokenUsage' into spaceToken…
fstagni Dec 12, 2017
fbd0783
autopep8 style
fstagni Dec 12, 2017
b11fba1
Merge branch 'integration' into spaceTokenUsage
fstagni Dec 13, 2017
83010ac
Merge remote-tracking branch 'upstream/integration' into spaceTokenUsage
fstagni Dec 19, 2017
87f55a0
FreeDiskSpace replaces SpaceTokenOccupancy
fstagni Dec 19, 2017
739710a
styling
fstagni Dec 19, 2017
871b345
Merge remote-tracking branch 'upstream/integration' into spaceTokenUsage
fstagni Dec 19, 2017
ce2ed75
fix test
fstagni Dec 19, 2017
2c249d7
using unit inside the command
fstagni Dec 20, 2017
f3aa637
bug fix plus style
fstagni Dec 20, 2017
397c2e6
restored function for extensibility
fstagni Jan 8, 2018
5f15112
Merge remote-tracking branch 'upstream/integration' into spaceTokenUsage
fstagni Jan 15, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions DataManagementSystem/Service/StorageElementHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
removeDirectory() - remove on directory recursively
removeFileList() - remove files in the list
getAdminInfo() - get administration information about the SE status
getFreeDiskSpace() - get the free disk space
getTotalDiskSpace() - get the free total space

The handler implements also the DISET data transfer calls
toClient(), fromClient(), bulkToClient(), bulkFromClient
Expand Down Expand Up @@ -50,32 +52,24 @@
MAX_STORAGE_SIZE = 0
USE_TOKENS = False

UNIT_CONVERSION = {"KB": 1024, "MB": 1024 * 1024, "GB": 1024 * 1024 * 1024, "TB": 1024 * 1024 * 1024 * 1024}


def getDiskSpace(path, size='TB', total=False):
def getDiskSpace(path, total=False):
"""
Returns disk usage of the given path.
If no size is specified, terabytes will be used by default.
Returns disk usage of the given path, in MB.
If total is set to true, the total disk space will be returned instead.
"""

size_to_convert = size.upper()
if size_to_convert not in UNIT_CONVERSION:
return S_ERROR("No valid size specified")
convert = UNIT_CONVERSION[size_to_convert]

try:
st = os.statvfs(path)

if total:
# return total space
queried_size = st.f_blocks
queriedSize = st.f_blocks
else:
# return free space
queried_size = st.f_bavail
queriedSize = st.f_bavail

result = float(queried_size * st.f_frsize) / float(convert)
result = float(queriedSize * st.f_frsize) / float(1024 * 1024)

except OSError as e:
return S_ERROR(errno.EIO, "Error while getting the available disk space: %s" % repr(e))
Expand All @@ -94,7 +88,7 @@ def getTotalDiskSpace():
global BASE_PATH
global MAX_STORAGE_SIZE

result = getDiskSpace(BASE_PATH, size='MB', total=True)
result = getDiskSpace(BASE_PATH, total=True)
if not result['OK']:
return result
totalSpace = result['Value']
Expand All @@ -112,7 +106,7 @@ def getFreeDiskSpace(ignoreMaxStorageSize=True):
global MAX_STORAGE_SIZE
global BASE_PATH

result = getDiskSpace(BASE_PATH, 'MB')
result = getDiskSpace(BASE_PATH)
if not result['OK']:
return result
freeSpace = result['Value']
Expand Down
1 change: 0 additions & 1 deletion ResourceStatusSystem/Agent/CacheFeederAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def initialize(self):
self.rmClient = ResourceManagementClient()

self.commands['Downtime'] = [{'Downtime': {}}]
self.commands['SpaceTokenOccupancy'] = [{'SpaceTokenOccupancy': {}}]
self.commands['GOCDBSync'] = [{'GOCDBSync': {}}]
self.commands['FreeDiskSpace'] = [{'FreeDiskSpace': {}}]

Expand Down
2 changes: 1 addition & 1 deletion ResourceStatusSystem/Command/DowntimeCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _cleanCommand(self, element, elementNames):

currentDate = datetime.utcnow()

if len(uniformResult) == 0:
if not uniformResult:
continue

# get the list of all ongoing DTs from GocDB
Expand Down
165 changes: 95 additions & 70 deletions ResourceStatusSystem/Command/FreeDiskSpaceCommand.py
Original file line number Diff line number Diff line change
@@ -1,113 +1,138 @@
''' FreeDiskSpaceCommand
The Command gets the free space that is left in a Storage Element

The Command gets the free space that is left in a DIRAC Storage Element
Note: there are, still, many references to "space tokens",
for example ResourceManagementClient().selectSpaceTokenOccupancyCache(token=elementName)
This is for historical reasons, and shoud be fixed one day.
For the moment, when you see "token" or "space token" here, just read "StorageElement".

'''

from datetime import datetime
from DIRAC import S_OK, S_ERROR, gLogger
from DIRAC.ResourceStatusSystem.Command.Command import Command
from DIRAC.Core.DISET.RPCClient import RPCClient
from DIRAC.ResourceStatusSystem.Utilities import CSHelpers
from DIRAC.Resources.Storage.StorageElement import StorageElement
__RCSID__ = '$Id:$'

from datetime import datetime

from DIRAC import S_OK, S_ERROR, gLogger
from DIRAC.ResourceStatusSystem.Command.Command import Command
from DIRAC.ResourceStatusSystem.Utilities import CSHelpers
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.ResourceStatusSystem.Client.ResourceManagementClient import ResourceManagementClient

__RCSID__ = '$Id: $'

UNIT_CONVERSION = {"MB": 1, "GB": 1024, "TB": 1024 * 1024}


class FreeDiskSpaceCommand( Command ):
class FreeDiskSpaceCommand(Command):
'''
Uses diskSpace method to get the free space
'''

def __init__( self, args = None, clients = None ):
def __init__(self, args=None, clients=None):

super( FreeDiskSpaceCommand, self ).__init__( args, clients = clients )
super(FreeDiskSpaceCommand, self).__init__(args, clients=clients)

self.rpc = None
self.rsClient = ResourceManagementClient()
self.rmClient = ResourceManagementClient()

def _prepareCommand( self ):
def _prepareCommand(self):
'''
FreeDiskSpaceCommand requires one argument:
- name : <str>
'''

if 'name' not in self.args:
return S_ERROR( '"name" not found in self.args' )
elementName = self.args[ 'name' ]
return S_ERROR('"name" not found in self.args')
elementName = self.args['name']

return S_OK( elementName )
# We keep TB as default as this is what was used (and will still be used)
# in the policy for "space tokens" ("real", "data" SEs)
unit = self.args.get('unit', 'TB')

def doNew( self, masterParams = None ):
return S_OK((elementName, unit))

def doNew(self, masterParams=None):
"""
Gets the total and the free disk space of a DIPS storage element that
is found in the CS and inserts the results in the SpaceTokenOccupancyCache table
Gets the parameters to run, either from the master method or from its
own arguments.

Gets the total and the free disk space of a storage element
and inserts the results in the SpaceTokenOccupancyCache table
of ResourceManagementDB database.

The result is also returned to the caller, not only inserted.
What is inserted in the DB will be in MB,
what is returned will be in the specified unit.
"""

if masterParams is not None:
elementName = masterParams
else:
elementName = self._prepareCommand()
if not elementName[ 'OK' ]:
return elementName

se = StorageElement(elementName)

elementURL = se.getStorageParameters(protocol = "dips")

if elementURL['OK']:
elementURL = se.getStorageParameters(protocol = "dips")['Value']['URLBase']
elementName, unit = masterParams
else:
gLogger.verbose( "Not a DIPS storage element, skipping..." )
return S_OK()

self.rpc = RPCClient( elementURL, timeout=120 )

free = self.rpc.getFreeDiskSpace("/")

if not free[ 'OK' ]:
return free
free = free['Value']

total = self.rpc.getTotalDiskSpace("/")
params = self._prepareCommand()
if not params['OK']:
return params
elementName, unit = params['Value']

if not total[ 'OK' ]:
return total
total = total['Value']
endpointResult = CSHelpers.getStorageElementEndpoint(elementName)
if not endpointResult['OK']:
return endpointResult

if free and free < 1:
free = 1
if total and total < 1:
total = 1

result = self.rsClient.addOrModifySpaceTokenOccupancyCache( endpoint = elementURL,
lastCheckTime = datetime.utcnow(),
free = free, total = total,
token = elementName )
if not result[ 'OK' ]:
se = StorageElement(elementName)
occupancyResult = se.getOccupancy()
if not occupancyResult['OK']:
return occupancyResult
occupancy = occupancyResult['Value']
free = occupancy['Free']
total = occupancy['Total']

results = {'Endpoint': endpointResult['Value'],
'Free': free,
'Total': total,
'ElementName': elementName}
result = self._storeCommand(results)
if not result['OK']:
return result

return S_OK()
# results are normally in 'MB'
unit = unit.upper()
if unit not in UNIT_CONVERSION:
return S_ERROR("No valid unit specified")
convert = UNIT_CONVERSION[unit]
return S_OK({'Free': float(free) / float(convert), 'Total': float(total) / float(convert)})

def _storeCommand(self, results):
""" Here purely for extensibility
"""
return self.rmClient.addOrModifySpaceTokenOccupancyCache(endpoint=results['Endpoint'],
lastCheckTime=datetime.utcnow(),
free=results['Free'],
total=results['Total'],
token=results['ElementName'])

def doCache( self ):
def doCache(self):
"""
This is a method that gets the element's details from the spaceTokenOccupancy cache.
This is a method that gets the element's details from the spaceTokenOccupancyCache DB table.
It will return a dictionary with th results, converted to "correct" unit.
"""

elementName = self._prepareCommand()
if not elementName[ 'OK' ]:
return elementName
params = self._prepareCommand()
if not params['OK']:
return params
elementName, unit = params['Value']

result = self.rsClient.selectSpaceTokenOccupancyCache(token = elementName)
result = self.rmClient.selectSpaceTokenOccupancyCache(token=elementName)

if not result[ 'OK' ]:
if not result['OK']:
return result

return S_OK( result )
# results are normally in 'MB'
free = result['Value'][0][3]
total = result['Value'][0][4]
unit = unit.upper()
if unit not in UNIT_CONVERSION:
return S_ERROR("No valid unit specified")
convert = UNIT_CONVERSION[unit]
return S_OK({'Free': float(free) / float(convert), 'Total': float(total) / float(convert)})

def doMaster( self ):
def doMaster(self):
"""
This method calls the doNew method for each storage element
that exists in the CS.
Expand All @@ -116,9 +141,9 @@ def doMaster( self ):
elements = CSHelpers.getStorageElements()

for name in elements['Value']:
diskSpace = self.doNew( name )
if not diskSpace[ 'OK' ]:
gLogger.error( "Unable to calculate free disk space", "name: %s" % name )
diskSpace = self.doNew(name)
if not diskSpace['OK']:
gLogger.error("Unable to calculate free/total disk space", "name: %s" % name)
continue

return S_OK()
Loading