Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 132 additions & 70 deletions Core/Utilities/File.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
By default on Error they return None.
"""

#pylint: skip-file
## getGlobbedFiles gives "RuntimeError: maximum recursion depth exceeded" in pylint
# pylint: skip-file
# getGlobbedFiles gives "RuntimeError: maximum recursion depth exceeded" in pylint

import os
import hashlib
Expand All @@ -17,20 +17,42 @@

__RCSID__ = "$Id$"

def mkDir( path ):
# Translation table of a given unit to Bytes
# I know, it should be kB...
SIZE_UNIT_CONVERSION = {
'B': 1,
'KB': 1024,
'MB': 1024 *
1024,
'GB': 1024 *
1024 *
1024,
'TB': 1024 *
1024 *
1024 *
1024,
'PB': 1024 *
1024 *
1024 *
1024 *
1024}


def mkDir(path):
""" Emulate 'mkdir -p path' (if path exists already, don't raise an exception)
"""
try:
if os.path.isdir(path):
return
os.makedirs( path )
os.makedirs(path)
except OSError as osError:
if osError.errno == errno.EEXIST and os.path.isdir( path ):
if osError.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise

def mkLink( src, dst ):

def mkLink(src, dst):
""" Protected creation of simbolic link
"""
try:
Expand All @@ -41,7 +63,8 @@ def mkLink( src, dst ):
else:
raise

def makeGuid( fileName = None ):

def makeGuid(fileName=None):
"""Utility to create GUID. If a filename is provided the
GUID will correspond to its content's hexadecimal md5 checksum.
Otherwise a random seed is used to create the GUID.
Expand All @@ -55,50 +78,52 @@ def makeGuid( fileName = None ):
myMd5 = hashlib.md5()
if fileName:
try:
with open( fileName, 'r' ) as fd:
data = fd.read( 10 * 1024 * 1024 )
myMd5.update( data )
except:
with open(fileName, 'r') as fd:
data = fd.read(10 * 1024 * 1024)
myMd5.update(data)
except BaseException:
return None
else:
myMd5.update( str( random.getrandbits( 128 ) ) )
myMd5.update(str(random.getrandbits(128)))

md5HexString = myMd5.hexdigest().upper()
return generateGuid( md5HexString, "MD5" )
return generateGuid(md5HexString, "MD5")


def generateGuid( checksum, checksumtype ):
def generateGuid(checksum, checksumtype):
""" Generate a GUID based on the file checksum
"""

if checksum:
if checksumtype == "MD5":
checksumString = checksum
elif checksumtype == "Adler32":
checksumString = str( checksum ).zfill( 32 )
checksumString = str(checksum).zfill(32)
else:
checksumString = ''
if checksumString:
guid = "%s-%s-%s-%s-%s" % ( checksumString[0:8],
checksumString[8:12],
checksumString[12:16],
checksumString[16:20],
checksumString[20:32] )
guid = "%s-%s-%s-%s-%s" % (checksumString[0:8],
checksumString[8:12],
checksumString[12:16],
checksumString[16:20],
checksumString[20:32])
guid = guid.upper()
return guid

# Failed to use the check sum, generate a new guid
myMd5 = hashlib.md5()
myMd5.update( str( random.getrandbits( 128 ) ) )
myMd5.update(str(random.getrandbits(128)))
md5HexString = myMd5.hexdigest()
guid = "%s-%s-%s-%s-%s" % ( md5HexString[0:8],
md5HexString[8:12],
md5HexString[12:16],
md5HexString[16:20],
md5HexString[20:32] )
guid = "%s-%s-%s-%s-%s" % (md5HexString[0:8],
md5HexString[8:12],
md5HexString[12:16],
md5HexString[16:20],
md5HexString[20:32])
guid = guid.upper()
return guid

def checkGuid( guid ):

def checkGuid(guid):
"""Checks whether a supplied GUID is of the correct format.
The guid is a string of 36 characters [0-9A-F] long split into 5 parts of length 8-4-4-4-12.

Expand All @@ -110,16 +135,17 @@ def checkGuid( guid ):
:param string guid: string to be checked
:return: True (False) if supplied string is (not) a valid GUID.
"""
reGUID = re.compile( "^[0-9A-F]{8}(-[0-9A-F]{4}){3}-[0-9A-F]{12}$" )
if reGUID.match( guid.upper() ):
reGUID = re.compile("^[0-9A-F]{8}(-[0-9A-F]{4}){3}-[0-9A-F]{12}$")
if reGUID.match(guid.upper()):
return True
else:
guid = [ len( x ) for x in guid.split( "-" ) ]
if ( guid == [ 8, 4, 4, 4, 12 ] ):
guid = [len(x) for x in guid.split("-")]
if (guid == [8, 4, 4, 4, 12]):
return True
return False

def getSize( fileName ):

def getSize(fileName):
"""Get size of a file.

:param string fileName: name of file to be checked
Expand All @@ -132,87 +158,91 @@ def getSize( fileName ):

"""
try:
return os.stat( fileName )[6]
return os.stat(fileName)[6]
except OSError:
return - 1

def getGlobbedTotalSize( files ):

def getGlobbedTotalSize(files):
"""Get total size of a list of files or a single file.
Globs the parameter to allow regular expressions.

:params list files: list or tuple of strings of files
"""
totalSize = 0
if isinstance( files, (list, tuple) ):
if isinstance(files, (list, tuple)):
for entry in files:
size = getGlobbedTotalSize( entry )
size = getGlobbedTotalSize(entry)
if size == -1:
size = 0
totalSize += size
else:
for path in glob.glob( files ):
if os.path.isdir( path ):
for content in os.listdir( path ):
totalSize += getGlobbedTotalSize( os.path.join( path, content ) )
if os.path.isfile( path ):
size = getSize( path )
for path in glob.glob(files):
if os.path.isdir(path):
for content in os.listdir(path):
totalSize += getGlobbedTotalSize(os.path.join(path, content))
if os.path.isfile(path):
size = getSize(path)
if size == -1:
size = 0
totalSize += size
return totalSize

def getGlobbedFiles( files ):

def getGlobbedFiles(files):
"""Get list of files or a single file.
Globs the parameter to allow regular expressions.

:params list files: list or tuple of strings of files
"""
globbedFiles = []
if isinstance( files, ( list, tuple ) ):
if isinstance(files, (list, tuple)):
for entry in files:
globbedFiles += getGlobbedFiles( entry )
globbedFiles += getGlobbedFiles(entry)
else:
for path in glob.glob( files ):
if os.path.isdir( path ):
for content in os.listdir( path ):
globbedFiles += getGlobbedFiles( os.path.join( path, content ) )
if os.path.isfile( path ):
globbedFiles.append( path )
for path in glob.glob(files):
if os.path.isdir(path):
for content in os.listdir(path):
globbedFiles += getGlobbedFiles(os.path.join(path, content))
if os.path.isfile(path):
globbedFiles.append(path)
return globbedFiles

def getCommonPath( files ):

def getCommonPath(files):
"""Get the common path for all files in the file list.

:param files: list of strings with paths
:type files: python:list
"""
def properSplit( dirPath ):
def properSplit(dirPath):
"""Splitting of path to drive and path parts for non-Unix file systems.

:param string dirPath: path
"""
nDrive, nPath = os.path.splitdrive( dirPath )
return [ nDrive ] + [ d for d in nPath.split( os.sep ) if d.strip() ]
nDrive, nPath = os.path.splitdrive(dirPath)
return [nDrive] + [d for d in nPath.split(os.sep) if d.strip()]
if not files:
return ""
commonPath = properSplit( files[0] )
commonPath = properSplit(files[0])
for fileName in files:
if os.path.isdir( fileName ):
if os.path.isdir(fileName):
dirPath = fileName
else:
dirPath = os.path.dirname( fileName )
nPath = properSplit( dirPath )
dirPath = os.path.dirname(fileName)
nPath = properSplit(dirPath)
tPath = []
for i in range( min( len( commonPath ), len( nPath ) ) ):
if commonPath[ i ] != nPath[ i ]:
for i in range(min(len(commonPath), len(nPath))):
if commonPath[i] != nPath[i]:
break
tPath .append( commonPath[ i ] )
tPath .append(commonPath[i])
if not tPath:
return ""
commonPath = tPath
return tPath[0] + os.sep + os.path.join( *tPath[1:] )
return tPath[0] + os.sep + os.path.join(*tPath[1:])


def getMD5ForFiles( fileList ):
def getMD5ForFiles(fileList):
"""Calculate md5 for the content of all the files.

:param fileList: list of paths
Expand All @@ -221,15 +251,47 @@ def getMD5ForFiles( fileList ):
fileList.sort()
hashMD5 = hashlib.md5()
for filePath in fileList:
if os.path.isdir( filePath ):
if os.path.isdir(filePath):
continue
with open( filePath, "rb" ) as fd:
buf = fd.read( 4096 )
with open(filePath, "rb") as fd:
buf = fd.read(4096)
while buf:
hashMD5.update( buf )
buf = fd.read( 4096 )
hashMD5.update(buf)
buf = fd.read(4096)
return hashMD5.hexdigest()


def convertSizeUnits(size, srcUnit, dstUnit):
""" Converts a number from a given source unit to a destination unit.

Example:
In [1]: convertSizeUnits(1024, 'B', 'kB')
Out[1]: 1

In [2]: convertSizeUnits(1024, 'MB', 'kB')
Out[2]: 1048576


:param size: number to convert
:param srcUnit: unit of the number. Any of ( 'B', 'kB', 'MB', 'GB', 'TB', 'PB')
:param dstUnit: unit expected for the return. Any of ( 'B', 'kB', 'MB', 'GB', 'TB', 'PB')

:returns: the size number converted in the dstUnit. In case of problem -sys.maxint is returned (negative)
"""

srcUnit = srcUnit.upper()
dstUnit = dstUnit.upper()

try:
convertedValue = float(size) * SIZE_UNIT_CONVERSION[srcUnit] / SIZE_UNIT_CONVERSION[dstUnit]
return convertedValue

# TypeError, ValueError: size is not a number
# KeyError: srcUnit or dstUnit are not in the conversion list
except (TypeError, ValueError, KeyError):
return -sys.maxsize


if __name__ == "__main__":
for p in sys.argv[1:]:
print "%s : %s bytes" % ( p, getGlobbedTotalSize( p ) )
print "%s : %s bytes" % (p, getGlobbedTotalSize(p))
Loading