From e32252be82cbe84b1ab813909a31f9d9f44f05d3 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Mon, 15 Jun 2020 10:32:42 +0200 Subject: [PATCH 1/2] RMS/DMS: cancel FTS operations if the matching RMS request does not exist anymore --- DataManagementSystem/Agent/FTS3Agent.py | 38 +- RequestManagementSystem/DB/RequestDB.py | 735 +++++++++++------------- 2 files changed, 375 insertions(+), 398 deletions(-) diff --git a/DataManagementSystem/Agent/FTS3Agent.py b/DataManagementSystem/Agent/FTS3Agent.py index 47fef84be99..1adeaca1eb6 100644 --- a/DataManagementSystem/Agent/FTS3Agent.py +++ b/DataManagementSystem/Agent/FTS3Agent.py @@ -354,24 +354,36 @@ def _treatOperation(self, operation): continueOperationProcessing = True # Check the status of the associated RMS Request. - # If it is canceled then we will not create new FTS3Jobs, and mark + # If it is canceled or does not exist anymore then we will not create new FTS3Jobs, and mark # this as FTS3Operation canceled. if operation.rmsReqID: res = ReqClient().getRequestStatus(operation.rmsReqID) if not res['OK']: - log.error("Could not get request status", res) - return operation, res - rmsReqStatus = res['Value'] - - if rmsReqStatus == 'Canceled': - log.info( - "The RMS Request is canceled, canceling the FTS3Operation", - "rmsReqID: %s, FTS3OperationID: %s" % - (operation.rmsReqID, - operation.operationID)) - operation.status = 'Canceled' - continueOperationProcessing = False + # If the Request does not exist anymore + if cmpError(res, errno.ENOENT): + log.info( + "The RMS Request does not exist anymore, canceling the FTS3Operation", + "rmsReqID: %s, FTS3OperationID: %s" % + (operation.rmsReqID, + operation.operationID)) + operation.status = 'Canceled' + continueOperationProcessing = False + else: + log.error("Could not get request status", res) + return operation, res + + else: + rmsReqStatus = res['Value'] + + if rmsReqStatus == 'Canceled': + log.info( + "The RMS Request is canceled, canceling the FTS3Operation", + "rmsReqID: %s, FTS3OperationID: %s" % + (operation.rmsReqID, + operation.operationID)) + operation.status = 'Canceled' + continueOperationProcessing = False if continueOperationProcessing: res = operation.prepareNewJobs( diff --git a/RequestManagementSystem/DB/RequestDB.py b/RequestManagementSystem/DB/RequestDB.py index e84a1a8360c..4f9a9e1b813 100644 --- a/RequestManagementSystem/DB/RequestDB.py +++ b/RequestManagementSystem/DB/RequestDB.py @@ -16,6 +16,7 @@ db holding Request, Operation and File """ +import errno import random import datetime @@ -24,7 +25,7 @@ from sqlalchemy.orm import relationship, backref, sessionmaker, joinedload_all, mapper from sqlalchemy.sql import update from sqlalchemy import create_engine, func, Table, Column, MetaData, ForeignKey, \ - Integer, String, DateTime, Enum, BLOB, BigInteger, distinct + Integer, String, DateTime, Enum, BLOB, BigInteger, distinct # # from DIRAC from DIRAC import S_OK, S_ERROR, gLogger @@ -34,7 +35,6 @@ from DIRAC.ConfigurationSystem.Client.Utilities import getDBParameters - __RCSID__ = "$Id $" @@ -44,234 +44,225 @@ # Description of the file table -fileTable = Table( 'File', metadata, - Column( 'FileID', Integer, primary_key = True ), - Column( 'OperationID', Integer, - ForeignKey( 'Operation.OperationID', ondelete = 'CASCADE' ), - nullable = False ), - Column( 'Status', Enum( 'Waiting', 'Done', 'Failed', 'Scheduled' ), server_default = 'Waiting' ), - Column( 'LFN', String( 255 ), index = True ), - Column( 'PFN', String( 255 ) ), - Column( 'ChecksumType', Enum( 'ADLER32', 'MD5', 'SHA1', '' ), server_default = '' ), - Column( 'Checksum', String( 255 ) ), - Column( 'GUID', String( 36 ) ), - Column( 'Size', BigInteger ), - Column( 'Attempt', Integer ), - Column( 'Error', String( 255 ) ), - mysql_engine = 'InnoDB' ) +fileTable = Table('File', metadata, + Column('FileID', Integer, primary_key=True), + Column('OperationID', Integer, + ForeignKey('Operation.OperationID', ondelete='CASCADE'), + nullable=False), + Column('Status', Enum('Waiting', 'Done', 'Failed', 'Scheduled'), server_default='Waiting'), + Column('LFN', String(255), index=True), + Column('PFN', String(255)), + Column('ChecksumType', Enum('ADLER32', 'MD5', 'SHA1', ''), server_default=''), + Column('Checksum', String(255)), + Column('GUID', String(36)), + Column('Size', BigInteger), + Column('Attempt', Integer), + Column('Error', String(255)), + mysql_engine='InnoDB') # Map the File object to the fileTable, with a few special attributes -mapper( File, fileTable, properties = {'_Status': fileTable.c.Status, - '_LFN': fileTable.c.LFN, - '_ChecksumType' : fileTable.c.ChecksumType, - '_GUID' : fileTable.c.GUID} ) +mapper(File, fileTable, properties={'_Status': fileTable.c.Status, + '_LFN': fileTable.c.LFN, + '_ChecksumType': fileTable.c.ChecksumType, + '_GUID': fileTable.c.GUID}) # Description of the Operation table -operationTable = Table( 'Operation', metadata, - Column( 'TargetSE', String( 255 ) ), - Column( 'CreationTime', DateTime ), - Column( 'SourceSE', String( 255 ) ), - Column( 'Arguments', BLOB ), - Column( 'Error', String( 255 ) ), - Column( 'Type', String( 64 ), nullable = False ), - Column( 'Order', Integer, nullable = False ), - Column( 'Status', - Enum( 'Waiting', 'Assigned', 'Queued', 'Done', 'Failed', 'Canceled', 'Scheduled' ), - server_default = 'Queued' ), - Column( 'LastUpdate', DateTime ), - Column( 'SubmitTime', DateTime ), - Column( 'Catalog', String( 255 ) ), - Column( 'OperationID', Integer, primary_key = True ), - Column( 'RequestID', Integer, - ForeignKey( 'Request.RequestID', ondelete = 'CASCADE' ), - nullable = False ), - mysql_engine = 'InnoDB' ) +operationTable = Table('Operation', metadata, + Column('TargetSE', String(255)), + Column('CreationTime', DateTime), + Column('SourceSE', String(255)), + Column('Arguments', BLOB), + Column('Error', String(255)), + Column('Type', String(64), nullable=False), + Column('Order', Integer, nullable=False), + Column('Status', + Enum('Waiting', 'Assigned', 'Queued', 'Done', 'Failed', 'Canceled', 'Scheduled'), + server_default='Queued'), + Column('LastUpdate', DateTime), + Column('SubmitTime', DateTime), + Column('Catalog', String(255)), + Column('OperationID', Integer, primary_key=True), + Column('RequestID', Integer, + ForeignKey('Request.RequestID', ondelete='CASCADE'), + nullable=False), + mysql_engine='InnoDB') # Map the Operation object to the operationTable, with a few special attributes -mapper( Operation, operationTable, properties = {'_CreationTime': operationTable.c.CreationTime, - '_Order': operationTable.c.Order, - '_Status': operationTable.c.Status, - '_LastUpdate': operationTable.c.LastUpdate, - '_SubmitTime': operationTable.c.SubmitTime, - '_Catalog': operationTable.c.Catalog, - '__files__':relationship( File, - backref = backref( '_parent', lazy = 'immediate' ), - lazy = 'immediate', - passive_deletes = True, - cascade = "all, delete-orphan" )} ) +mapper(Operation, operationTable, properties={'_CreationTime': operationTable.c.CreationTime, + '_Order': operationTable.c.Order, + '_Status': operationTable.c.Status, + '_LastUpdate': operationTable.c.LastUpdate, + '_SubmitTime': operationTable.c.SubmitTime, + '_Catalog': operationTable.c.Catalog, + '__files__': relationship(File, + backref=backref('_parent', lazy='immediate'), + lazy='immediate', + passive_deletes=True, + cascade="all, delete-orphan")}) # Description of the Request Table -requestTable = Table( 'Request', metadata, - Column( 'DIRACSetup', String( 32 ) ), - Column( 'CreationTime', DateTime ), - Column( 'JobID', Integer, server_default = '0' ), - Column( 'OwnerDN', String( 255 ) ), - Column( 'RequestName', String( 255 ), nullable = False ), - Column( 'Error', String( 255 ) ), - Column( 'Status', - Enum( 'Waiting', 'Assigned', 'Done', 'Failed', 'Canceled', 'Scheduled' ), - server_default = 'Waiting' ), - Column( 'LastUpdate', DateTime ), - Column( 'OwnerGroup', String( 32 ) ), - Column( 'SubmitTime', DateTime ), - Column( 'RequestID', Integer, primary_key = True ), - Column( 'SourceComponent', BLOB ), - Column( 'NotBefore', DateTime ), - mysql_engine = 'InnoDB' ) +requestTable = Table('Request', metadata, + Column('DIRACSetup', String(32)), + Column('CreationTime', DateTime), + Column('JobID', Integer, server_default='0'), + Column('OwnerDN', String(255)), + Column('RequestName', String(255), nullable=False), + Column('Error', String(255)), + Column('Status', + Enum('Waiting', 'Assigned', 'Done', 'Failed', 'Canceled', 'Scheduled'), + server_default='Waiting'), + Column('LastUpdate', DateTime), + Column('OwnerGroup', String(32)), + Column('SubmitTime', DateTime), + Column('RequestID', Integer, primary_key=True), + Column('SourceComponent', BLOB), + Column('NotBefore', DateTime), + mysql_engine='InnoDB') # Map the Request object to the requestTable, with a few special attributes -mapper( Request, requestTable, properties = {'_CreationTime': requestTable.c.CreationTime, - '_Status': requestTable.c.Status, - '_LastUpdate': requestTable.c.LastUpdate, - '_SubmitTime': requestTable.c.SubmitTime, - '_NotBefore': requestTable.c.NotBefore, - '__operations__' : relationship( Operation, - backref = backref( '_parent', - lazy = 'immediate' ), - order_by = operationTable.c.Order, - lazy = 'immediate', - passive_deletes = True, - cascade = "all, delete-orphan" )} ) - +mapper(Request, requestTable, properties={'_CreationTime': requestTable.c.CreationTime, + '_Status': requestTable.c.Status, + '_LastUpdate': requestTable.c.LastUpdate, + '_SubmitTime': requestTable.c.SubmitTime, + '_NotBefore': requestTable.c.NotBefore, + '__operations__': relationship(Operation, + backref=backref('_parent', + lazy='immediate'), + order_by=operationTable.c.Order, + lazy='immediate', + passive_deletes=True, + cascade="all, delete-orphan")}) ######################################################################## -class RequestDB( object ): +class RequestDB(object): """ .. class:: RequestDB db holding requests """ - - def __getDBConnectionInfo( self, fullname ): + def __getDBConnectionInfo(self, fullname): """ Collect from the CS all the info needed to connect to the DB. This should be in a base class eventually """ - result = getDBParameters( fullname ) - if not result[ 'OK' ]: - raise Exception( 'Cannot get database parameters: %s' % result[ 'Message' ] ) + result = getDBParameters(fullname) + if not result['OK']: + raise Exception('Cannot get database parameters: %s' % result['Message']) - dbParameters = result[ 'Value' ] - self.dbHost = dbParameters[ 'Host' ] - self.dbPort = dbParameters[ 'Port' ] - self.dbUser = dbParameters[ 'User' ] - self.dbPass = dbParameters[ 'Password' ] - self.dbName = dbParameters[ 'DBName' ] + dbParameters = result['Value'] + self.dbHost = dbParameters['Host'] + self.dbPort = dbParameters['Port'] + self.dbUser = dbParameters['User'] + self.dbPass = dbParameters['Password'] + self.dbName = dbParameters['DBName'] - - def __init__( self ): + def __init__(self): """c'tor :param self: self reference """ - self.log = gLogger.getSubLogger( 'RequestDB' ) + self.log = gLogger.getSubLogger('RequestDB') # Initialize the connection info - self.__getDBConnectionInfo( 'RequestManagement/ReqDB' ) - - + self.__getDBConnectionInfo('RequestManagement/ReqDB') - runDebug = ( gLogger.getLevel() == 'DEBUG' ) + runDebug = (gLogger.getLevel() == 'DEBUG') self.engine = create_engine('mysql://%s:%s@%s:%s/%s' % (self.dbUser, self.dbPass, self.dbHost, self.dbPort, self.dbName), echo=runDebug, pool_recycle=3600) metadata.bind = self.engine - self.DBSession = sessionmaker( bind = self.engine ) + self.DBSession = sessionmaker(bind=self.engine) - - def createTables( self ): + def createTables(self): """ create tables """ try: - metadata.create_all( self.engine ) + metadata.create_all(self.engine) except Exception as e: - return S_ERROR( e ) + return S_ERROR(e) return S_OK() - - def cancelRequest( self, requestID ): + def cancelRequest(self, requestID): session = self.DBSession() try: - updateRet = session.execute( update( Request )\ - .where( Request.RequestID == requestID )\ - .values( {Request._Status : 'Canceled', - Request._LastUpdate : datetime.datetime.utcnow()\ - .strftime( Request._datetimeFormat )} ) ) + updateRet = session.execute(update(Request) + .where(Request.RequestID == requestID) + .values({Request._Status: 'Canceled', + Request._LastUpdate: datetime.datetime.utcnow() + .strftime(Request._datetimeFormat)})) session.commit() # No row was changed if not updateRet.rowcount: - return S_ERROR( "No such request %s" % requestID ) + return S_ERROR("No such request %s" % requestID) return S_OK() except Exception as e: session.rollback() - self.log.exception( "cancelRequest: unexpected exception", lException = e ) - return S_ERROR( "cancelRequest: unexpected exception %s" % e ) + self.log.exception("cancelRequest: unexpected exception", lException=e) + return S_ERROR("cancelRequest: unexpected exception %s" % e) finally: session.close() - - def putRequest( self, request ): + def putRequest(self, request): """ update or insert request into db :param ~Request.Request request: Request instance """ - session = self.DBSession( expire_on_commit = False ) + session = self.DBSession(expire_on_commit=False) try: try: - if hasattr( request, 'RequestID' ): + if hasattr(request, 'RequestID'): - status = session.query( Request._Status )\ - .filter( Request.RequestID == request.RequestID )\ + status = session.query(Request._Status)\ + .filter(Request.RequestID == request.RequestID)\ .one() if status[0] == 'Canceled': - self.log.info( "Request %s(%s) was canceled, don't put it back" % ( request.RequestID, request.RequestName ) ) - return S_OK( request.RequestID ) + self.log.info("Request %s(%s) was canceled, don't put it back" % (request.RequestID, request.RequestName)) + return S_OK(request.RequestID) - except NoResultFound, e: + except NoResultFound as e: pass # Since the object request is not attached to the session, we merge it to have an update # instead of an insert with duplicate primary key - request = session.merge( request ) - session.add( request ) + request = session.merge(request) + session.add(request) session.commit() session.expunge_all() - return S_OK( request.RequestID ) + return S_OK(request.RequestID) except Exception as e: session.rollback() - self.log.exception( "putRequest: unexpected exception", lException = e ) - return S_ERROR( "putRequest: unexpected exception %s" % e ) + self.log.exception("putRequest: unexpected exception", lException=e) + return S_ERROR("putRequest: unexpected exception %s" % e) finally: session.close() - - def getScheduledRequest( self, operationID ): + def getScheduledRequest(self, operationID): session = self.DBSession() try: - requestID = session.query( Request.RequestID )\ - .join( Request.__operations__ )\ - .filter( Operation.OperationID == operationID )\ - .one() - return self.getRequest( requestID[0] ) + requestID = session.query(Request.RequestID)\ + .join(Request.__operations__)\ + .filter(Operation.OperationID == operationID)\ + .one() + return self.getRequest(requestID[0]) except NoResultFound: return S_OK() finally: @@ -291,9 +282,7 @@ def getScheduledRequest( self, operationID ): # return S_ERROR( "getRequestName: no request found for RequestID=%s" % requestID ) # finally: # session.close() - - - def getRequest( self, reqID = 0, assigned = True ): + def getRequest(self, reqID=0, assigned=True): """ read request for execution :param reqID: request's ID (default 0) If 0, take a pseudo random one @@ -301,8 +290,8 @@ def getRequest( self, reqID = 0, assigned = True ): """ # expire_on_commit is set to False so that we can still use the object after we close the session - session = self.DBSession( expire_on_commit = False ) - log = self.log.getSubLogger( 'getRequest' if assigned else 'peekRequest' ) + session = self.DBSession(expire_on_commit=False) + log = self.log.getSubLogger('getRequest' if assigned else 'peekRequest') requestID = None @@ -311,83 +300,84 @@ def getRequest( self, reqID = 0, assigned = True ): if reqID: requestID = reqID - log.verbose( "selecting request '%s'%s" % ( reqID, ' (Assigned)' if assigned else '' ) ) + log.verbose("selecting request '%s'%s" % (reqID, ' (Assigned)' if assigned else '')) status = None try: - status = session.query( Request._Status )\ - .filter( Request.RequestID == reqID )\ + status = session.query(Request._Status)\ + .filter(Request.RequestID == reqID)\ .one() - except NoResultFound, e: - return S_ERROR( "getRequest: request '%s' not exists" % reqID ) + except NoResultFound as e: + return S_ERROR("getRequest: request '%s' not exists" % reqID) if status and status == "Assigned" and assigned: - return S_ERROR( "getRequest: status of request '%s' is 'Assigned', request cannot be selected" % reqID ) + return S_ERROR("getRequest: status of request '%s' is 'Assigned', request cannot be selected" % reqID) else: - now = datetime.datetime.utcnow().replace( microsecond = 0 ) + now = datetime.datetime.utcnow().replace(microsecond=0) reqIDs = set() try: - reqAscIDs = session.query( Request.RequestID )\ - .filter( Request._Status == 'Waiting' )\ - .filter( Request._NotBefore < now )\ - .order_by( Request._LastUpdate )\ - .limit( 100 )\ + reqAscIDs = session.query(Request.RequestID)\ + .filter(Request._Status == 'Waiting')\ + .filter(Request._NotBefore < now)\ + .order_by(Request._LastUpdate)\ + .limit(100)\ .all() - reqIDs = set( [reqID[0] for reqID in reqAscIDs] ) + reqIDs = set([reqID[0] for reqID in reqAscIDs]) - reqDescIDs = session.query( Request.RequestID )\ - .filter( Request._Status == 'Waiting' )\ - .filter( Request._NotBefore < now )\ - .order_by( Request._LastUpdate.desc() )\ - .limit( 50 )\ + reqDescIDs = session.query(Request.RequestID)\ + .filter(Request._Status == 'Waiting')\ + .filter(Request._NotBefore < now)\ + .order_by(Request._LastUpdate.desc())\ + .limit(50)\ .all() - reqIDs |= set( [reqID[0] for reqID in reqDescIDs] ) + reqIDs |= set([reqID[0] for reqID in reqDescIDs]) # No Waiting requests - except NoResultFound, e: + except NoResultFound as e: return S_OK() if not reqIDs: return S_OK() - reqIDs = list( reqIDs ) - random.shuffle( reqIDs ) + reqIDs = list(reqIDs) + random.shuffle(reqIDs) requestID = reqIDs[0] - # If we are here, the request MUST exist, so no try catch # the joinedload_all is to force the non-lazy loading of all the attributes, especially _parent - request = session.query( Request )\ - .options( joinedload_all( '__operations__.__files__' ) )\ - .filter( Request.RequestID == requestID )\ + request = session.query(Request)\ + .options(joinedload_all('__operations__.__files__'))\ + .filter(Request.RequestID == requestID)\ .one() if not reqID: - log.verbose( "selected request %s('%s')%s" % ( request.RequestID, request.RequestName, ' (Assigned)' if assigned else '' ) ) - + log.verbose( + "selected request %s('%s')%s" % + (request.RequestID, + request.RequestName, + ' (Assigned)' if assigned else '')) if assigned: - session.execute( update( Request )\ - .where( Request.RequestID == requestID )\ - .values( {Request._Status : 'Assigned', - Request._LastUpdate : datetime.datetime.utcnow()\ - .strftime( Request._datetimeFormat )} ) - ) + session.execute(update(Request) + .where(Request.RequestID == requestID) + .values({Request._Status: 'Assigned', + Request._LastUpdate: datetime.datetime.utcnow() + .strftime(Request._datetimeFormat)}) + ) session.commit() session.expunge_all() - return S_OK( request ) + return S_OK(request) except Exception as e: session.rollback() - log.exception( "getRequest: unexpected exception", lException = e ) - return S_ERROR( "getRequest: unexpected exception : %s" % e ) + log.exception("getRequest: unexpected exception", lException=e) + return S_ERROR("getRequest: unexpected exception : %s" % e) finally: session.close() - - def getBulkRequests( self, numberOfRequest = 10, assigned = True ): + def getBulkRequests(self, numberOfRequest=10, assigned=True): """ read as many requests as requested for execution :param int numberOfRequest: Number of Request we want (default 10) @@ -398,8 +388,8 @@ def getBulkRequests( self, numberOfRequest = 10, assigned = True ): """ # expire_on_commit is set to False so that we can still use the object after we close the session - session = self.DBSession( expire_on_commit = False ) - log = self.log.getSubLogger( 'getBulkRequest' if assigned else 'peekBulkRequest' ) + session = self.DBSession(expire_on_commit=False) + log = self.log.getSubLogger('getBulkRequest' if assigned else 'peekBulkRequest') requestDict = {} @@ -407,93 +397,87 @@ def getBulkRequests( self, numberOfRequest = 10, assigned = True ): # If we are here, the request MUST exist, so no try catch # the joinedload_all is to force the non-lazy loading of all the attributes, especially _parent try: - now = datetime.datetime.utcnow().replace( microsecond = 0 ) - requestIDs = session.query( Request.RequestID )\ - .with_for_update()\ - .filter( Request._Status == 'Waiting' )\ - .filter( Request._NotBefore < now )\ - .order_by( Request._LastUpdate )\ - .limit( numberOfRequest )\ - .all() + now = datetime.datetime.utcnow().replace(microsecond=0) + requestIDs = session.query(Request.RequestID)\ + .with_for_update()\ + .filter(Request._Status == 'Waiting')\ + .filter(Request._NotBefore < now)\ + .order_by(Request._LastUpdate)\ + .limit(numberOfRequest)\ + .all() requestIDs = [ridTuple[0] for ridTuple in requestIDs] - log.debug( "Got request ids %s" % requestIDs ) + log.debug("Got request ids %s" % requestIDs) - requests = session.query( Request )\ - .options( joinedload_all( '__operations__.__files__' ) )\ - .filter( Request.RequestID.in_( requestIDs ) )\ + requests = session.query(Request)\ + .options(joinedload_all('__operations__.__files__'))\ + .filter(Request.RequestID.in_(requestIDs))\ .all() - log.debug( "Got %s Request objects " % len( requests ) ) - requestDict = dict( ( req.RequestID, req ) for req in requests ) + log.debug("Got %s Request objects " % len(requests)) + requestDict = dict((req.RequestID, req) for req in requests) # No Waiting requests - except NoResultFound, e: + except NoResultFound as e: pass if assigned and requestDict: - session.execute( update( Request )\ - .where( Request.RequestID.in_( requestDict.keys() ) )\ - .values( {Request._Status : 'Assigned', - Request._LastUpdate : datetime.datetime.utcnow()\ - .strftime( Request._datetimeFormat )} ) - ) + session.execute(update(Request) + .where(Request.RequestID.in_(requestDict.keys())) + .values({Request._Status: 'Assigned', + Request._LastUpdate: datetime.datetime.utcnow() + .strftime(Request._datetimeFormat)}) + ) session.commit() session.expunge_all() except Exception as e: session.rollback() - log.exception( "unexpected exception", lException = e ) - return S_ERROR( "getBulkRequest: unexpected exception : %s" % e ) + log.exception("unexpected exception", lException=e) + return S_ERROR("getBulkRequest: unexpected exception : %s" % e) finally: session.close() - return S_OK( requestDict ) + return S_OK(requestDict) - - - def peekRequest( self, requestID ): + def peekRequest(self, requestID): """ get request (ro), no update on states :param requestID: Request.RequestID """ - return self.getRequest( requestID, False ) - + return self.getRequest(requestID, False) - - def getRequestIDsList( self, statusList = None, limit = None, since = None, until = None, getJobID = False ): + def getRequestIDsList(self, statusList=None, limit=None, since=None, until=None, getJobID=False): """ select requests with status in :statusList: """ - statusList = statusList if statusList else list( Request.FINAL_STATES ) + statusList = statusList if statusList else list(Request.FINAL_STATES) limit = limit if limit else 100 session = self.DBSession() requestIDs = [] try: if getJobID: - reqQuery = session.query( Request.RequestID, Request._Status, Request._LastUpdate, Request.JobID )\ - .filter( Request._Status.in_( statusList ) ) + reqQuery = session.query(Request.RequestID, Request._Status, Request._LastUpdate, Request.JobID)\ + .filter(Request._Status.in_(statusList)) else: - reqQuery = session.query( Request.RequestID, Request._Status, Request._LastUpdate )\ - .filter( Request._Status.in_( statusList ) ) + reqQuery = session.query(Request.RequestID, Request._Status, Request._LastUpdate)\ + .filter(Request._Status.in_(statusList)) if since: - reqQuery = reqQuery.filter( Request._LastUpdate > since ) + reqQuery = reqQuery.filter(Request._LastUpdate > since) if until: - reqQuery = reqQuery.filter( Request._LastUpdate < until ) + reqQuery = reqQuery.filter(Request._LastUpdate < until) - reqQuery = reqQuery.order_by( Request._LastUpdate )\ - .limit( limit ) - requestIDs = [ tuple( reqIDTuple ) for reqIDTuple in reqQuery.all() ] + reqQuery = reqQuery.order_by(Request._LastUpdate)\ + .limit(limit) + requestIDs = [tuple(reqIDTuple) for reqIDTuple in reqQuery.all()] except Exception as e: session.rollback() - self.log.exception( "getRequestIDsList: unexpected exception", lException = e ) - return S_ERROR( "getRequestIDsList: unexpected exception : %s" % e ) + self.log.exception("getRequestIDsList: unexpected exception", lException=e) + return S_ERROR("getRequestIDsList: unexpected exception : %s" % e) finally: session.close() - return S_OK( requestIDs ) - + return S_OK(requestIDs) - - def deleteRequest( self, requestID ): + def deleteRequest(self, requestID): """ delete request given its ID :param str requestID: request.RequestID @@ -503,58 +487,55 @@ def deleteRequest( self, requestID ): session = self.DBSession() try: - session.query( Request ).filter( Request.RequestID == requestID ).delete() + session.query(Request).filter(Request.RequestID == requestID).delete() session.commit() except Exception as e: session.rollback() - self.log.exception( "deleteRequest: unexpected exception", lException = e ) - return S_ERROR( "deleteRequest: unexpected exception : %s" % e ) + self.log.exception("deleteRequest: unexpected exception", lException=e) + return S_ERROR("deleteRequest: unexpected exception : %s" % e) finally: session.close() return S_OK() - - def getDBSummary( self ): + def getDBSummary(self): """ get db summary """ # # this will be returned - retDict = { "Request" : {}, "Operation" : {}, "File" : {} } + retDict = {"Request": {}, "Operation": {}, "File": {}} session = self.DBSession() try: - requestQuery = session.query( Request._Status, func.count( Request.RequestID ) )\ - .group_by( Request._Status )\ + requestQuery = session.query(Request._Status, func.count(Request.RequestID))\ + .group_by(Request._Status)\ .all() for status, count in requestQuery: retDict["Request"][status] = count - operationQuery = session.query( Operation.Type, Operation._Status, func.count( Operation.OperationID ) )\ - .group_by( Operation.Type, Operation._Status )\ + operationQuery = session.query(Operation.Type, Operation._Status, func.count(Operation.OperationID))\ + .group_by(Operation.Type, Operation._Status)\ .all() for oType, status, count in operationQuery: - retDict['Operation'].setdefault( oType, {} )[status] = count - + retDict['Operation'].setdefault(oType, {})[status] = count - fileQuery = session.query( File._Status, func.count( File.FileID ) )\ - .group_by( File._Status )\ + fileQuery = session.query(File._Status, func.count(File.FileID))\ + .group_by(File._Status)\ .all() for status, count in fileQuery: retDict["File"][status] = count except Exception as e: - self.log.exception( "getDBSummary: unexpected exception", lException = e ) - return S_ERROR( "getDBSummary: unexpected exception : %s" % e ) + self.log.exception("getDBSummary: unexpected exception", lException=e) + return S_ERROR("getDBSummary: unexpected exception : %s" % e) finally: session.close() - return S_OK( retDict ) + return S_OK(retDict) - - def getRequestSummaryWeb( self, selectDict, sortList, startItem, maxItems ): + def getRequestSummaryWeb(self, selectDict, sortList, startItem, maxItems): """ Returns a list of Request for the web portal :param dict selectDict: parameter on which to restrain the query {key : Value} @@ -565,30 +546,25 @@ def getRequestSummaryWeb( self, selectDict, sortList, startItem, maxItems ): :param int startItem: start item (for pagination) :param int maxItems: max items (for pagination) """ - - - parameterList = [ 'RequestID', 'RequestName', 'JobID', 'OwnerDN', 'OwnerGroup', - 'Status', "Error", "CreationTime", "LastUpdate"] - + parameterList = ['RequestID', 'RequestName', 'JobID', 'OwnerDN', 'OwnerGroup', + 'Status', "Error", "CreationTime", "LastUpdate"] resultDict = {} session = self.DBSession() try: - summaryQuery = session.query( Request.RequestID, Request.RequestName, - Request.JobID, Request.OwnerDN, Request.OwnerGroup, - Request._Status, Request.Error, - Request._CreationTime, Request._LastUpdate ) - - + summaryQuery = session.query(Request.RequestID, Request.RequestName, + Request.JobID, Request.OwnerDN, Request.OwnerGroup, + Request._Status, Request.Error, + Request._CreationTime, Request._LastUpdate) for key, value in selectDict.items(): if key == 'ToDate': - summaryQuery = summaryQuery.filter( Request._LastUpdate < value ) + summaryQuery = summaryQuery.filter(Request._LastUpdate < value) elif key == 'FromDate': - summaryQuery = summaryQuery.filter( Request._LastUpdate > value ) + summaryQuery = summaryQuery.filter(Request._LastUpdate > value) else: tableName = 'Request' @@ -600,56 +576,55 @@ def getRequestSummaryWeb( self, selectDict, sortList, startItem, maxItems ): elif key == 'Status': key = '_Status' - if isinstance( value, list ): - summaryQuery = summaryQuery.filter( eval( '%s.%s.in_(%s)' % ( tableName, key, value ) ) ) + if isinstance(value, list): + summaryQuery = summaryQuery.filter(eval('%s.%s.in_(%s)' % (tableName, key, value))) else: - summaryQuery = summaryQuery.filter( eval( '%s.%s' % ( tableName, key ) ) == value ) + summaryQuery = summaryQuery.filter(eval('%s.%s' % (tableName, key)) == value) if sortList: - summaryQuery = summaryQuery.order_by( eval( 'Request.%s.%s()' % ( sortList[0][0], sortList[0][1].lower() ) ) ) + summaryQuery = summaryQuery.order_by(eval('Request.%s.%s()' % (sortList[0][0], sortList[0][1].lower()))) try: requestLists = summaryQuery.all() - except NoResultFound, e: + except NoResultFound as e: resultDict['ParameterNames'] = parameterList resultDict['Records'] = [] - return S_OK( resultDict ) + return S_OK(resultDict) except Exception as e: - return S_ERROR( 'Error getting the webSummary %s' % e ) + return S_ERROR('Error getting the webSummary %s' % e) - nRequests = len( requestLists ) + nRequests = len(requestLists) - if startItem <= len( requestLists ): + if startItem <= len(requestLists): firstIndex = startItem else: - return S_ERROR( 'getRequestSummaryWeb: Requested index out of range' ) + return S_ERROR('getRequestSummaryWeb: Requested index out of range') - if ( startItem + maxItems ) <= len( requestLists ): + if (startItem + maxItems) <= len(requestLists): secondIndex = startItem + maxItems else: - secondIndex = len( requestLists ) + secondIndex = len(requestLists) records = [] - for i in range( firstIndex, secondIndex ): + for i in range(firstIndex, secondIndex): row = requestLists[i] - records.append( [ str( x ) for x in row] ) + records.append([str(x) for x in row]) resultDict['ParameterNames'] = parameterList resultDict['Records'] = records resultDict['TotalRecords'] = nRequests - return S_OK( resultDict ) + return S_OK(resultDict) # except Exception as e: - self.log.exception( "getRequestSummaryWeb: unexpected exception", lException = e ) - return S_ERROR( "getRequestSummaryWeb: unexpected exception : %s" % e ) + self.log.exception("getRequestSummaryWeb: unexpected exception", lException=e) + return S_ERROR("getRequestSummaryWeb: unexpected exception : %s" % e) finally: session.close() - - def getRequestCountersWeb( self, groupingAttribute, selectDict ): + def getRequestCountersWeb(self, groupingAttribute, selectDict): """ For the web portal. Returns a dictionary {value : counts} for a given key. The key can be any field from the RequestTable. or "Type", @@ -668,50 +643,47 @@ def getRequestCountersWeb( self, groupingAttribute, selectDict ): groupingAttribute = 'Request.%s' % groupingAttribute try: - summaryQuery = session.query( eval( groupingAttribute ), func.count( Request.RequestID ) ) + summaryQuery = session.query(eval(groupingAttribute), func.count(Request.RequestID)) for key, value in selectDict.items(): if key == 'ToDate': - summaryQuery = summaryQuery.filter( Request._LastUpdate < value ) + summaryQuery = summaryQuery.filter(Request._LastUpdate < value) elif key == 'FromDate': - summaryQuery = summaryQuery.filter( Request._LastUpdate > value ) + summaryQuery = summaryQuery.filter(Request._LastUpdate > value) else: objectType = 'Request' if key == 'Type': - summaryQuery = summaryQuery.join( Request.__operations__ ) + summaryQuery = summaryQuery.join(Request.__operations__) objectType = 'Operation' elif key == 'Status': key = '_Status' - if isinstance( value, list ): - summaryQuery = summaryQuery.filter( eval( '%s.%s.in_(%s)' % ( objectType, key, value ) ) ) + if isinstance(value, list): + summaryQuery = summaryQuery.filter(eval('%s.%s.in_(%s)' % (objectType, key, value))) else: - summaryQuery = summaryQuery.filter( eval( '%s.%s' % ( objectType, key ) ) == value ) + summaryQuery = summaryQuery.filter(eval('%s.%s' % (objectType, key)) == value) - summaryQuery = summaryQuery.group_by( groupingAttribute ) + summaryQuery = summaryQuery.group_by(groupingAttribute) try: requestLists = summaryQuery.all() - resultDict = dict( requestLists ) - except NoResultFound, e: + resultDict = dict(requestLists) + except NoResultFound as e: pass except Exception as e: - return S_ERROR( 'Error getting the webCounters %s' % e ) - + return S_ERROR('Error getting the webCounters %s' % e) - - return S_OK( resultDict ) + return S_OK(resultDict) except Exception as e: - self.log.exception( "getRequestSummaryWeb: unexpected exception", lException = e ) - return S_ERROR( "getRequestSummaryWeb: unexpected exception : %s" % e ) + self.log.exception("getRequestSummaryWeb: unexpected exception", lException=e) + return S_ERROR("getRequestSummaryWeb: unexpected exception : %s" % e) finally: session.close() - - def getDistinctValues( self, tableName, columnName ): + def getDistinctValues(self, tableName, columnName): """ For a given table and a given field, return the list of of distinct values in the DB""" session = self.DBSession() @@ -719,54 +691,52 @@ def getDistinctValues( self, tableName, columnName ): if columnName == 'Status': columnName = '_Status' try: - result = session.query( distinct( eval ( "%s.%s" % ( tableName, columnName ) ) ) ).all() + result = session.query(distinct(eval("%s.%s" % (tableName, columnName)))).all() distinctValues = [dist[0] for dist in result] - except NoResultFound, e: + except NoResultFound as e: pass except Exception as e: - self.log.exception( "getDistinctValues: unexpected exception", lException = e ) - return S_ERROR( "getDistinctValues: unexpected exception : %s" % e ) + self.log.exception("getDistinctValues: unexpected exception", lException=e) + return S_ERROR("getDistinctValues: unexpected exception : %s" % e) finally: session.close() - return S_OK( distinctValues ) - + return S_OK(distinctValues) - def getRequestIDsForJobs( self, jobIDs ): + def getRequestIDsForJobs(self, jobIDs): """ read request ids for jobs given jobIDs :param jobIDs: list of jobIDs :type jobIDs: python:list """ - self.log.debug( "getRequestIDsForJobs: got %s jobIDs to check" % str( jobIDs ) ) + self.log.debug("getRequestIDsForJobs: got %s jobIDs to check" % str(jobIDs)) if not jobIDs: - return S_ERROR( "Must provide jobID list as argument." ) - if isinstance( jobIDs, ( long, int ) ): - jobIDs = [ jobIDs ] - jobIDs = set( jobIDs ) + return S_ERROR("Must provide jobID list as argument.") + if isinstance(jobIDs, (long, int)): + jobIDs = [jobIDs] + jobIDs = set(jobIDs) - reqDict = { "Successful": {}, "Failed": {} } + reqDict = {"Successful": {}, "Failed": {}} session = self.DBSession() try: - ret = session.query( Request.JobID, Request.RequestID )\ - .filter( Request.JobID.in_( jobIDs ) )\ - .all() + ret = session.query(Request.JobID, Request.RequestID)\ + .filter(Request.JobID.in_(jobIDs))\ + .all() - reqDict['Successful'] = dict( ( jobId, reqID ) for jobId, reqID in ret ) - reqDict['Failed'] = dict( ( jobid, 'Request not found' ) for jobid in jobIDs - set( reqDict['Successful'] ) ) + reqDict['Successful'] = dict((jobId, reqID) for jobId, reqID in ret) + reqDict['Failed'] = dict((jobid, 'Request not found') for jobid in jobIDs - set(reqDict['Successful'])) except Exception as e: - self.log.exception( "getRequestIDsForJobs: unexpected exception", lException = e ) - return S_ERROR( "getRequestIDsForJobs: unexpected exception : %s" % e ) + self.log.exception("getRequestIDsForJobs: unexpected exception", lException=e) + return S_ERROR("getRequestIDsForJobs: unexpected exception : %s" % e) finally: session.close() - return S_OK( reqDict ) + return S_OK(reqDict) - - def readRequestsForJobs( self, jobIDs = None ): + def readRequestsForJobs(self, jobIDs=None): """ read request for jobs :param jobIDs: list of JobIDs @@ -774,50 +744,48 @@ def readRequestsForJobs( self, jobIDs = None ): :return: S_OK( "Successful" : { jobID1 : Request, jobID2: Request, ... } "Failed" : { jobID3: "error message", ... } ) """ - self.log.debug( "readRequestForJobs: got %s jobIDs to check" % str( jobIDs ) ) + self.log.debug("readRequestForJobs: got %s jobIDs to check" % str(jobIDs)) if not jobIDs: - return S_ERROR( "Must provide jobID list as argument." ) - if isinstance( jobIDs, ( long, int ) ): - jobIDs = [ jobIDs ] - jobIDs = set( jobIDs ) + return S_ERROR("Must provide jobID list as argument.") + if isinstance(jobIDs, (long, int)): + jobIDs = [jobIDs] + jobIDs = set(jobIDs) - reqDict = { "Successful": {}, "Failed": {} } + reqDict = {"Successful": {}, "Failed": {}} # expire_on_commit is set to False so that we can still use the object after we close the session - session = self.DBSession( expire_on_commit = False ) + session = self.DBSession(expire_on_commit=False) try: - ret = session.query( Request.JobID, Request )\ - .options( joinedload_all( '__operations__.__files__' ) )\ - .filter( Request.JobID.in_( jobIDs ) ).all() + ret = session.query(Request.JobID, Request)\ + .options(joinedload_all('__operations__.__files__'))\ + .filter(Request.JobID.in_(jobIDs)).all() - reqDict['Successful'] = dict( ( jobId, reqObj ) for jobId, reqObj in ret ) + reqDict['Successful'] = dict((jobId, reqObj) for jobId, reqObj in ret) - reqDict['Failed'] = dict( ( jobid, 'Request not found' ) for jobid in jobIDs - set( reqDict['Successful'] ) ) + reqDict['Failed'] = dict((jobid, 'Request not found') for jobid in jobIDs - set(reqDict['Successful'])) session.expunge_all() except Exception as e: - self.log.exception( "readRequestsForJobs: unexpected exception", lException = e ) - return S_ERROR( "readRequestsForJobs: unexpected exception : %s" % e ) + self.log.exception("readRequestsForJobs: unexpected exception", lException=e) + return S_ERROR("readRequestsForJobs: unexpected exception : %s" % e) finally: session.close() - return S_OK( reqDict ) - + return S_OK(reqDict) - def getRequestStatus( self, requestID ): + def getRequestStatus(self, requestID): """ get request status for a given request ID """ - self.log.debug( "getRequestStatus: checking status for '%s' request" % requestID ) + self.log.debug("getRequestStatus: checking status for '%s' request" % requestID) session = self.DBSession() try: - status = session.query( Request._Status ).filter( Request.RequestID == requestID ).one() - except NoResultFound: - return S_ERROR( "Request %s does not exist" % requestID ) + status = session.query(Request._Status).filter(Request.RequestID == requestID).one() + except NoResultFound: + return S_ERROR(errno.ENOENT, "Request %s does not exist" % requestID) finally: session.close() - return S_OK( status[0] ) - + return S_OK(status[0]) - def getRequestFileStatus( self, requestID, lfnList ): + def getRequestFileStatus(self, requestID, lfnList): """ get status for files in request given its id :param str requestID: Request.RequestID @@ -827,71 +795,68 @@ def getRequestFileStatus( self, requestID, lfnList ): session = self.DBSession() try: - res = dict.fromkeys( lfnList, "UNKNOWN" ) - requestRet = session.query( File._LFN, File._Status )\ - .join( Request.__operations__ )\ - .join( Operation.__files__ )\ - .filter( Request.RequestID == requestID )\ - .filter( File._LFN.in_( lfnList ) )\ - .all() + res = dict.fromkeys(lfnList, "UNKNOWN") + requestRet = session.query(File._LFN, File._Status)\ + .join(Request.__operations__)\ + .join(Operation.__files__)\ + .filter(Request.RequestID == requestID)\ + .filter(File._LFN.in_(lfnList))\ + .all() for lfn, status in requestRet: res[lfn] = status - return S_OK( res ) + return S_OK(res) except Exception as e: - self.log.exception( "getRequestFileStatus: unexpected exception", lException = e ) - return S_ERROR( "getRequestFileStatus: unexpected exception : %s" % e ) + self.log.exception("getRequestFileStatus: unexpected exception", lException=e) + return S_ERROR("getRequestFileStatus: unexpected exception : %s" % e) finally: session.close() - - def getRequestInfo( self, requestID ): + def getRequestInfo(self, requestID): """ get request info given Request.RequestID """ session = self.DBSession() try: - requestInfoQuery = session.query( Request.RequestID, Request._Status, Request.RequestName, - Request.JobID, Request.OwnerDN, Request.OwnerGroup, - Request.DIRACSetup, Request.SourceComponent, Request._CreationTime, - Request._SubmitTime, Request._LastUpdate )\ - .filter( Request.RequestID == requestID ) - + requestInfoQuery = session.query(Request.RequestID, Request._Status, Request.RequestName, + Request.JobID, Request.OwnerDN, Request.OwnerGroup, + Request.DIRACSetup, Request.SourceComponent, Request._CreationTime, + Request._SubmitTime, Request._LastUpdate)\ + .filter(Request.RequestID == requestID) try: requestInfo = requestInfoQuery.one() except NoResultFound: - return S_ERROR( 'No such request' ) + return S_ERROR('No such request') - return S_OK( tuple( requestInfo ) ) + return S_OK(tuple(requestInfo)) except Exception as e: - self.log.exception( "getRequestInfo: unexpected exception", lException = e ) - return S_ERROR( "getRequestInfo: unexpected exception : %s" % e ) + self.log.exception("getRequestInfo: unexpected exception", lException=e) + return S_ERROR("getRequestInfo: unexpected exception : %s" % e) finally: session.close() - def getDigest( self, requestID ): + def getDigest(self, requestID): """ get digest for request given its id :param str requestName: request id """ - self.log.debug( "getDigest: will create digest for request '%s'" % requestID ) - request = self.getRequest( requestID, False ) + self.log.debug("getDigest: will create digest for request '%s'" % requestID) + request = self.getRequest(requestID, False) if not request["OK"]: - self.log.error( "getDigest: %s" % request["Message"] ) + self.log.error("getDigest: %s" % request["Message"]) return request request = request["Value"] - if not isinstance( request, Request ): - self.log.info( "getDigest: request '%s' not found" ) + if not isinstance(request, Request): + self.log.info("getDigest: request '%s' not found") return S_OK() return request.getDigest() - - def getRequestIDForName( self, requestName ): + def getRequestIDForName(self, requestName): """ read request id for given name if the name is not unique, an error is returned @@ -901,22 +866,22 @@ def getRequestIDForName( self, requestName ): reqID = 0 try: - ret = session.query( Request.RequestID )\ - .filter( Request.RequestName == requestName )\ + ret = session.query(Request.RequestID)\ + .filter(Request.RequestName == requestName)\ .all() if not ret: - return S_ERROR( 'No such request %s' % requestName ) - elif len( ret ) > 1: - return S_ERROR( 'RequestName %s not unique (%s matches)' % ( requestName, len( ret ) ) ) + return S_ERROR('No such request %s' % requestName) + elif len(ret) > 1: + return S_ERROR('RequestName %s not unique (%s matches)' % (requestName, len(ret))) reqID = ret[0][0] - except NoResultFound, e: - return S_ERROR( 'No such request' ) + except NoResultFound as e: + return S_ERROR('No such request') except Exception as e: - self.log.exception( "getRequestIDsForName: unexpected exception", lException = e ) - return S_ERROR( "getRequestIDsForName: unexpected exception : %s" % e ) + self.log.exception("getRequestIDsForName: unexpected exception", lException=e) + return S_ERROR("getRequestIDsForName: unexpected exception : %s" % e) finally: session.close() - return S_OK( reqID ) + return S_OK(reqID) From 62d719595e3158eea8e22c2f1bae4dd1d3ea33fc Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Mon, 15 Jun 2020 10:32:54 +0200 Subject: [PATCH 2/2] gitignore: ignore env file for vscode --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 0e1c97cd729..7bea1aaba40 100644 --- a/.gitignore +++ b/.gitignore @@ -72,6 +72,7 @@ tests/CI/SERVERCONFIG # VSCode .vscode +.env # docs # this is auto generated