Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion Core/Utilities/ProcessPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,20 @@ def __init__( self, minSize = 2, maxSize = 0, maxQueuedRequests = 10,
self.__bulletCounter = 0
self.__daemonProcess = False
self.__spawnNeededWorkingProcesses()

def stopProcessing( self ):
""" case fire

:param self: self reference
"""
self.finalize()

def startProcessing( self ):
""" restrat processing again

:param self: self reference
"""
self.__draining = False

def setPoolCallback( self, callback ):
""" set ProcessPool callback function
Expand Down Expand Up @@ -737,7 +751,6 @@ def __filicide( self ):
os.kill( worker.pid(), signal.SIGKILL )
del self.__workersDict[pid]


def daemonize( self ):
""" Make ProcessPool a finite being for opening and closing doors between chambers.
Also just run it in a separate background thread to the death of PID 0.
Expand Down
2 changes: 1 addition & 1 deletion DataManagementSystem/private/RequestAgentBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def resetRequests( self ):

:param self: self reference
"""
self.info("resetRequest: will put %s back requests" % len(self.__requestHolder) )
self.log.info("resetRequest: will put %s back requests" % len(self.__requestHolder) )
for requestName, requestTuple in self.__requestHolder.items():
requestString, requestServer = requestTuple
reset = self.requestClient().updateRequest( requestName, requestString, requestServer )
Expand Down
57 changes: 46 additions & 11 deletions Resources/Storage/SRM2Storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
################################################################################
# $HeadURL$
################################################################################

""" This is the SRM2 StorageClass """

__RCSID__ = "$Id$"

## imports
import os
import re
import time
import tempfile
from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE
from types import ListType, StringTypes, StringType, DictType, IntType
## from DIRAC
from DIRAC import gLogger, gConfig, S_OK, S_ERROR
from DIRAC.Resources.Storage.StorageBase import StorageBase
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
Expand All @@ -10,13 +22,16 @@
from DIRAC.Core.Utilities.Pfn import pfnparse, pfnunparse
from DIRAC.Core.Utilities.List import breakListIntoChunks
from DIRAC.Core.Utilities.File import getSize
from DIRAC.Core.Utilities.Subprocess import shellCall
from DIRAC.AccountingSystem.Client.Types.DataOperation import DataOperation
from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient

from stat import S_ISREG, S_ISDIR, S_IMODE, ST_MODE, ST_SIZE
import types, re, os, time

################################################################################
class SRM2Storage( StorageBase ):
"""
.. class:: SRM2Storage

"""

def __init__( self, storageName, protocol, path, host, port, spaceToken, wspath ):
self.isok = True
Expand Down Expand Up @@ -311,9 +326,9 @@ def getTransportURL( self, path, protocols = False ):
if not protocols['OK']:
return protocols
listProtocols = protocols['Value']
elif type( protocols ) == types.StringType:
elif type( protocols ) == StringType:
listProtocols = [protocols]
elif type( protocols ) == types.ListType:
elif type( protocols ) == ListType:
listProtocols = protocols
else:
return S_ERROR( "SRM2Storage.getTransportURL: Must supply desired protocols to this plug-in." )
Expand Down Expand Up @@ -688,8 +703,28 @@ def __putFile( self, src_file, dest_url, sourceSize ):
nbstreams = 1
gLogger.info( "SRM2Storage.__putFile: Using %d streams" % nbstreams )
gLogger.info( "SRM2Storage.__putFile: Executing transfer of %s to %s" % ( src_url, dest_url ) )

## create pipe
pipeName = "%s/%s" % ( tempfile.mktemp(), os.path.basename( src_file ) )
useFIFO = False
try:
os.mkfifo( pipeName )
ret = shellCall( cmdSeq = "dd if=%s of=%s bs=%s" % ( src_file, pipeName, "32M" ), timeout = 10 )
if ret["OK"]:
useFIFO = True
gLogger.debug("SRM2Storage.__putFile: Pipe %s created" % pipeName )
except OSError, error:
gLogger.error( "SRM2Storage.__putFile: Unable to create pipe: %s" % str(error) )

if useFIFO:
src_url = "file:%s" % pipeName

res = pythonCall( ( timeout + 10 ), self.__lcg_cp_wrapper, src_url, dest_url,
srctype, dsttype, nbstreams, timeout, src_spacetokendesc, dest_spacetokendesc )

if useFIFO:
os.unlink( pipeName )

if not res['OK']:
# Remove the failed replica, just in case
result = self.__executeOperation( dest_url, 'removeFile' )
Expand Down Expand Up @@ -737,16 +772,16 @@ def __lcg_cp_wrapper( self, src_url, dest_url, srctype, dsttype, nbstreams,
dsttype, self.nobdii, self.voName, nbstreams, self.conf_file,
self.insecure, self.verbose, timeout, src_spacetokendesc,
dest_spacetokendesc )
if type( errCode ) not in [types.IntType]:
if type( errCode ) != IntType:
gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was not an integer",
"%s %s" % ( errCode, type( errCode ) ) )
if type( errCode ) in [types.ListType]:
if type( errCode ) != ListType:
msg = []
for err in errCode:
msg.append( '%s of type %s' % ( err, type( err ) ) )
gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was List:\n" , "\n".join( msg ) )
return S_ERROR( "SRM2Storage.__lcg_cp_wrapper: Returned errCode was not an integer" )
if type( errStr ) not in types.StringTypes:
if type( errStr ) not in StringTypes:
gLogger.error( "SRM2Storage.__lcg_cp_wrapper: Returned errStr was not a string",
"%s %s" % ( errCode, type( errStr ) ) )
return S_ERROR( "SRM2Storage.__lcg_cp_wrapper: Returned errStr was not a string" )
Expand Down Expand Up @@ -1317,13 +1352,13 @@ def __removeSubDirectories( self, subDirectories ):
return resDict

def checkArgumentFormat( self, path ):
if type( path ) in types.StringTypes:
if type( path ) in StringTypes:
urls = {path:False}
elif type( path ) == types.ListType:
elif type( path ) == ListType:
urls = {}
for url in path:
urls[url] = False
elif type( path ) == types.DictType:
elif type( path ) == DictType:
urls = path
else:
return S_ERROR( "SRM2Storage.checkArgumentFormat: Supplied path is not of the correct format." )
Expand Down