diff --git a/Core/Security/ProxyInfo.py b/Core/Security/ProxyInfo.py index 3cdbcd26ba0..3ac8fcce3ce 100644 --- a/Core/Security/ProxyInfo.py +++ b/Core/Security/ProxyInfo.py @@ -6,7 +6,7 @@ import six import base64 -from DIRAC import S_OK, S_ERROR +from DIRAC import S_OK, S_ERROR, gLogger from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error from DIRAC.Core.Security.VOMS import VOMS @@ -157,7 +157,10 @@ def formatProxyStepsInfoAsString(infoList): if key in stepInfo: value = stepInfo[key] if key == 'serial': - value = base64.b16encode(value) + try: + value = base64.b16encode(value) + except Exception as e: + gLogger.exception("Could not read serial:", lException=e) if key == 'lifetime': secs = value hours = int(secs / 3600) diff --git a/FrameworkSystem/DB/ProxyDB.py b/FrameworkSystem/DB/ProxyDB.py index 346049df20c..97f42cd3ad0 100755 --- a/FrameworkSystem/DB/ProxyDB.py +++ b/FrameworkSystem/DB/ProxyDB.py @@ -714,10 +714,10 @@ def __generateProxyFromProxyProvider(self, userDN, proxyProvider): if not result['OK']: return result pp = result['Value'] - result = pp.getProxy({"DN": userDN}) + result = pp.getProxy(userDN) if not result['OK']: return result - proxyStr = result['Value']['proxy'] + proxyStr = result['Value'] chain = X509Chain() result = chain.loadProxyFromString(proxyStr) if not result['OK']: @@ -1064,9 +1064,12 @@ def setPersistencyFlag(self, userDN, userGroup, persistent=True): if not result['OK']: self.log.error("setPersistencyFlag: Can not retrieve username for DN", userDN) return result - userName = result['Value'] + try: + sUserName = self._escapeString(result['Value'])['Value'] + except KeyError: + return S_ERROR("Can't escape user name") cmd = "INSERT INTO `ProxyDB_Proxies` ( UserName, UserDN, UserGroup, Pem, ExpirationTime, PersistentFlag ) VALUES " - cmd += "( %s, %s, %s, '', UTC_TIMESTAMP(), 'True' )" % (userName, sUserDN, sUserGroup) + cmd += "( %s, %s, %s, '', UTC_TIMESTAMP(), 'True' )" % (sUserName, sUserDN, sUserGroup) else: cmd = "UPDATE `ProxyDB_Proxies` SET PersistentFlag='%s' WHERE UserDN=%s AND UserGroup=%s" % (sqlFlag, sUserDN, diff --git a/Resources/ProxyProvider/DIRACCAProxyProvider.py b/Resources/ProxyProvider/DIRACCAProxyProvider.py index 510d70cb2e1..1ee4319efe4 100644 --- a/Resources/ProxyProvider/DIRACCAProxyProvider.py +++ b/Resources/ProxyProvider/DIRACCAProxyProvider.py @@ -1,18 +1,52 @@ """ ProxyProvider implementation for the proxy generation using local (DIRAC) CA credentials + + This class is a simple, limited CA, its main purpose is to generate a simple proxy for DIRAC users + who do not have any certificate register on the fly. + + Required parameters in the DIRAC configuration for its implementation:: + + section: + + * ProviderType = DIRACCA, + * CertFile = , + * KeyFile = , + * Match = , # For ex.: 'Match = O, OU' + * Supplied = , + * Optional = , + * DNOrder = , # For ex.: 'DNOrder = O, OU, CN, emailAddress' + * : , # For ex.: 'OU = CA' + + Also, as an additional feature, this class can read properties from a simple openssl CA configuration file. + To do this, just set the path to an existing configuration file as a CAConfigFile parameter. In this case, + the distinguished names order in the created proxy will be the same as in the configuration file policy block. + + The Proxy provider supports the following distinguished names + (https://www.cryptosys.net/pki/manpki/pki_distnames.html):: + + SN(surname) + GN(givenName) + C(countryName) + CN(commonName) + L(localityName) + Email(emailAddress) + O(organizationName) + OU(organizationUnitName) + SP,ST(stateOrProvinceName) + SERIALNUMBER(serialNumber) + """ import re import time import random import datetime +import collections from M2Crypto import m2, util, X509, ASN1, EVP, RSA from DIRAC import gLogger, S_OK, S_ERROR from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error -from DIRAC.Core.Security.X509Certificate import X509Certificate # pylint: disable=import-error -from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.Resources.ProxyProvider.ProxyProvider import ProxyProvider __RCSID__ = "$Id$" @@ -33,218 +67,210 @@ def __init__(self, parameters=None): self.match = [] self.supplied = ['CN'] self.optional = ['C', 'O', 'OU', 'emailAddress'] - # Add not supported distributes names - self.fs2nid = X509.X509_Name.nid.copy() - self.fs2nid['DC'] = -1 - self.fs2nid['domainComponent'] = -1 - self.fs2nid['organizationalUnitName'] = 18 - self.fs2nid['countryName'] = 14 - self.n2field = {} # nid: most short or specidied in CS distributes name - self.n2fields = {} # nid: list of distributes names + self.dnList = ['C', 'O', 'OU', 'CN', 'emailAddress'] + # Distinguished names + self.fields2nid = X509.X509_Name.nid.copy() + self.fields2nid['DC'] = -1 # Add DN that is not liested in X509.X509_Name + self.fields2nid['domainComponent'] = -1 # Add DN description that is not liested in X509.X509_Name + self.fields2nid['organizationalUnitName'] = 18 # Add 'OU' description + self.fields2nid['countryName'] = 14 # Add 'C' description + self.fields2nid['SERIALNUMBER'] = 105 # Add 'SERIALNUMBER' distinguished name + self.nid2fields = {} # nid: list of distinguished names # Specify standart fields - for field in self.fs2nid: - if self.fs2nid[field] not in self.n2fields: - self.n2fields[self.fs2nid[field]] = [] - self.n2fields[self.fs2nid[field]].append(field) - for nid in self.n2fields: - for field in self.n2fields[nid]: - if nid not in self.n2field: - self.n2field[nid] = field - self.n2field[nid] = len(field) < len(self.n2field[nid]) and field or self.n2field[nid] + for field, nid in self.fields2nid.items(): + self.nid2fields.setdefault(nid, []).append(field) + self.dnInfoDictCA = {} def setParameters(self, parameters): """ Set new parameters :param dict parameters: provider parameters + + :return: S_OK()/S_ERROR() """ - # If CA configuration file exist + for k, v in parameters.items(): + if not isinstance(v, list) and k in ['Match', 'Supplied', 'Optional', 'DNOrder'] + self.fields2nid.keys(): + parameters[k] = v.replace(', ', ',').split(',') self.parameters = parameters + # If CA configuration file exist if parameters.get('CAConfigFile'): - self.supplied, self.optional = [], [] self.__parseCACFG() if 'Bits' in parameters: self.bits = int(parameters['Bits']) if 'Algoritm' in parameters: self.algoritm = parameters['Algoritm'] if 'Match' in parameters: - self.match = [] - for field in parameters['Match'].replace(' ', '').split(','): - self.match.append(self.fs2nid[field]) + self.match = [self.fields2nid[f] for f in parameters['Match']] if 'Supplied' in parameters: - self.supplied = [] - for field in parameters['Supplied'].replace(' ', '').split(','): - self.supplied.append(self.fs2nid[field]) + self.supplied = [self.fields2nid[f] for f in parameters['Supplied']] if 'Optional' in parameters: - self.optional = [] - for field in parameters['Optional'].replace(' ', '').split(','): - self.optional.append(self.fs2nid[field]) + self.optional = [self.fields2nid[f] for f in parameters['Optional']] + allFields = self.optional + self.supplied + self.match + if 'DNOrder' in parameters: + self.dnList = [] + if not any([any([f in parameters['DNOrder'] for f in self.nid2fields[n]]) for n in allFields]): + return S_ERROR('DNOrder must contain all configured fields.') + for field in parameters['DNOrder']: + if self.fields2nid[field] in allFields: + self.dnList.append(field) + # Set defaults for distridutes names - self.defDict = {} - for field, value in parameters.items(): - if field in self.fs2nid: - self.defDict[field] = value - self.defFieldByNid = dict([[self.fs2nid[field], field] for field in self.defDict]) - for nid in self.n2field: - if nid in self.defFieldByNid: - self.n2field[nid] = self.defFieldByNid[nid] - self.match.sort() - self.supplied.sort() - - def checkStatus(self, userDict=None, sessionDict=None): - """ Read ready to work status of proxy provider + self.nid2defField = {} + for field, value in self.parameters.items(): + if field in self.fields2nid and self.fields2nid[field] in allFields: + self.parameters[self.fields2nid[field]] = value + self.nid2defField[self.fields2nid[field]] = field - :param dict userDict: user description dictionary with possible fields: - FullName, UserName, DN, Email, DiracGroup - :param dict sessionDict: session dictionary + # Read CA certificate + chain = X509Chain() + result = chain.loadChainFromFile(self.parameters['CertFile']) + if result['OK']: + result = chain.getCredentials() + if result['OK']: + result = self.__parseDN(result['Value']['subject']) + if not result['OK']: + return result + self.dnInfoDictCA = result['Value'] + return S_OK() - :return: S_OK(dict)/S_ERROR() -- dictionary contain fields: - - 'Status' with ready to work status[ready, needToAuth] - """ - self.log.info('Ckecking work status of', self.parameters['ProviderName']) - # Evaluate full name and e-mail of the user - fullName = userDict.get('FullName') - eMail = userDict.get('Email') - - reqDNs = userDict.get('DN') or [] - if not isinstance(reqDNs, list): - reqDNs = reqDNs.split(', ') - for reqDN in reqDNs: - # Get the DN info as a dictionary - result = Registry.getProxyProvidersForDN(reqDN) - if not result['OK']: - return result - if self.parameters['ProviderName'] not in result['Value']: - continue - dnDict = dict([field.split('=') for field in reqDN.lstrip('/').split('/')]) - if not fullName: - fullName = dnDict.get('CN') - if not eMail: - eMail = dnDict.get('emailAddress') - self.log.info('Full name:', fullName) - self.log.info('Email:', eMail) - if not fullName: - return S_ERROR("Incomplete user information: no full name found") - if not eMail: - return S_ERROR("Incomplete user information: no email found") - - return S_OK({'Status': 'ready'}) - - def getProxy(self, userDict=None, sessionDict=None): - """ Generate user proxy + def checkStatus(self, userDN): + """ Read ready to work status of proxy provider - :param dict userDict: user description dictionary with possible fields: - FullName, UserName, DN, Email, DiracGroup - :param dict sessionDict: session dictionary + :param str userDN: user DN - :return: S_OK(dict)/S_ERROR() -- dict contain 'proxy' field with is a proxy string + :return: S_OK()/S_ERROR() """ - result = self.getUserDN(userDict, sessionDict, userDN=userDict.get('DN')) + self.log.debug('Ckecking work status of', self.parameters['ProviderName']) + result = self.__parseDN(userDN) if not result['OK']: return result + dnInfoDict = result['Value'] + + try: + userNIDs = [self.fields2nid[f.split('=')[0]] for f in userDN.lstrip('/').split('/')] + except (ValueError, KeyError) as e: + return S_ERROR('Unknown DN field in used DN: %s' % e) + nidOrder = [self.fields2nid[f] for f in self.dnList] + for index, nid in enumerate(userNIDs): + if nid not in nidOrder: + return S_ERROR('"%s" field not found in order.' % + self.nid2defField.get(nid, min(self.nid2fields[nid], key=len))) + if index > nidOrder.index(nid): + return S_ERROR('Bad DNs order') + for i in range(nidOrder.index(nid) - 1): + try: + if userNIDs.index(nidOrder[i]) > index: + return S_ERROR('Bad DNs order') + except (ValueError, KeyError): + continue + for i in range(nidOrder.index(nid) + 1, len(nidOrder)): + try: + if userNIDs.index(nidOrder[i]) < index: + return S_ERROR('Bad DNs order') + except (ValueError, KeyError): + continue - result = self.__createCertM2Crypto() - if not result['OK']: - return result - certStr, keyStr = result['Value'] + for nid in self.supplied: + if nid not in [self.fields2nid[f] for f in dnInfoDict]: + return S_ERROR('Current DN is invalid, "%s" field must be set.' % + self.nid2defField.get(nid, min(self.nid2fields[nid], key=len))) + + for field, values in dnInfoDict.items(): + nid = self.fields2nid[field] + err = 'Current DN is invalid, "%s" field' % field + if nid not in self.supplied + self.match + self.optional: + return S_ERROR('%s is not found for current CA.' % err) + if nid in self.match and not self.dnInfoDictCA[field] == values: + return S_ERROR('%s must be /%s=%s.' % (err, field, + ('/%s=' % field).joing(self.dnInfoDictCA[field]))) + if nid in self.maxDict: + rangeMax = range(min(len(values), len(self.maxDict[nid]))) + if any([True if len(values[i]) > self.maxDict[nid][i] else False for i in rangeMax]): + return S_ERROR('%s values must be less then %s.' % (err, ', '.join(self.maxDict[nid]))) + if nid in self.minDict: + rangeMin = range(min(len(values), len(self.minDict[nid]))) + if any([True if len(values[i]) < self.minDict[nid][i] else False for i in rangeMin]): + return S_ERROR('%s values must be more then %s.' % (err, ', '.join(self.minDict[nid]))) + + result = self.__fillX509Name(field, values) + if not result['OK']: + return result - chain = X509Chain() - result = chain.loadChainFromString(certStr) - if not result['OK']: - return result - result = chain.loadKeyFromString(keyStr) - if not result['OK']: - return result + return S_OK() - result = chain.generateProxyToString(365 * 24 * 3600, rfc=True) - if not result['OK']: - return result - return S_OK({'proxy': result['Value']}) + def getProxy(self, userDN): + """ Generate user proxy - def getUserDN(self, userDict=None, sessionDict=None, userDN=None): + :param str userDN: user DN + + :return: S_OK(str)/S_ERROR() -- contain a proxy string + """ + self.__X509Name = X509.X509_Name() + result = self.checkStatus(userDN) + if result['OK']: + result = self.__createCertM2Crypto() + if result['OK']: + certStr, keyStr = result['Value'] + + chain = X509Chain() + result = chain.loadChainFromString(certStr) + if result['OK']: + result = chain.loadKeyFromString(keyStr) + if result['OK']: + result = chain.generateProxyToString(365 * 24 * 3600, rfc=True) + + return result + + def generateDN(self, **kwargs): """ Get DN of the user certificate that will be created - :param dict userDict: user description dictionary with possible fields: - FullName, UserName, DN, Email, DiracGroup - :param dict sessionDict: session dictionary - :param basestring userDN: user DN + :param dict kwargs: user description dictionary with possible fields: + - FullName or CN + - Email or emailAddress - :return: S_OK()/S_ERROR(), Value is the DN string + :return: S_OK(str)/S_ERROR() -- contain DN """ - userDict = userDict or {} - chain = X509Chain() - result = chain.loadChainFromFile(self.parameters['CertFile']) - if not result['OK']: - return result - result = chain.getCredentials() - if not result['OK']: - return result - caDN = result['Value']['subject'] - caDict = dict([field.split('=') for field in caDN.lstrip('/').split('/')]) - caFieldByNid = dict([[self.fs2nid[field], field] for field in caDict]) - - dnDict = {} - if userDN: - self.log.info('Checking %s user DN' % userDN) - dnDict = dict([field.split('=') for field in userDN.lstrip('/').split('/')]) - dnFieldByNid = dict([[self.fs2nid[field], field] for field in dnDict]) - for nid in self.supplied: - if nid not in dnFieldByNid: - return S_ERROR('Current DN is invalid, "%s" field must be set.' % self.n2field[nid]) - for nid in dnFieldByNid: - if nid not in self.supplied + self.match + self.optional: - return S_ERROR('Current DN is invalid, "%s" field is not found for current CA.' % dnFieldByNid[nid]) - if nid in self.match and not caDict[caFieldByNid[nid]] == dnDict[dnFieldByNid[nid]]: - return S_ERROR('Current DN is invalid, "%s" field must be %s.' % (dnFieldByNid[nid], - caDict[caFieldByNid[nid]])) - if nid in self.maxDict and len(dnDict[dnFieldByNid[nid]]) > self.maxDict[nid]: - return S_ERROR('Current DN is invalid, "%s" field must be less then %s.' % (dnDict[dnFieldByNid[nid]], - self.maxDict[nid])) - if nid in self.minDict and len(dnDict[dnFieldByNid[nid]]) < self.minDict[nid]: - return S_ERROR('Current DN is invalid, "%s" field must be more then %s.' % (dnDict[dnFieldByNid[nid]], - self.minDict[nid])) - for k, v in dnDict.items(): - if self.defDict.get(k): - self.defDict[k] = v - if self.fs2nid[k] == self.fs2nid['CN']: - userDict['FullName'] = v - if self.fs2nid[k] == self.fs2nid['emailAddress']: - userDict['Email'] = v - else: - result = self.checkStatus(userDict) - if not result['OK']: - return result + if kwargs.get('FullName'): + kwargs['CN'] = [kwargs['FullName']] + if kwargs.get('Email'): + kwargs['emailAddress'] = [kwargs['Email']] - # Fill DN subject name - if not userDict.get('FullName'): - return S_ERROR("Incomplete user information: no full name found") - if not userDict.get('Email'): - return S_ERROR("Incomplete user information: no email found") self.__X509Name = X509.X509_Name() - self.log.info('Creating distributes names chain') - # Test match fields - for nid in self.match: - if nid not in caFieldByNid: - return S_ERROR('Distributes name(%s) must be present in CA certificate.' % ', '.join(self.n2fields[nid])) - result = self.__fillX509Name(caFieldByNid[nid], caDict[caFieldByNid[nid]]) + self.log.info('Creating distinguished names chain') + + for nid in self.supplied: + if nid not in [self.fields2nid[f] for f in self.dnList]: + return S_ERROR('DNs order list does not contain supplied DN "%s"' % + self.nid2defField.get(nid, min(self.nid2fields[nid], key=len))) + + for field in self.dnList: + values = [] + nid = self.fields2nid[field] + if nid in self.match: + for field in self.nid2fields[nid]: + if field in self.dnInfoDictCA: + values = self.dnInfoDictCA[field] + if not values: + return S_ERROR('Not found "%s" match DN in CA' % field) + for field in self.nid2fields[nid]: + if kwargs.get(field): + values = kwargs[field] if isinstance(kwargs[field], list) else [kwargs[field]] + if not values and nid in self.supplied: + # Search default value + if nid not in self.nid2defField: + return S_ERROR('No values set for "%s" DN' % min(self.nid2fields[nid], key=len)) + values = self.parameters[nid] + + result = self.__fillX509Name(field, values) if not result['OK']: return result - # Test supplied fields - for nid in self.supplied: - if self.defDict.get(self.n2field[nid]): - result = self.__fillX509Name(self.n2field[nid], self.defDict[self.n2field[nid]]) - if not result['OK']: - return result - for nid, value in [(self.fs2nid['CN'], userDict['FullName']), - (self.fs2nid['emailAddress'], userDict['Email'])]: - if nid in self.supplied + self.optional: - result = self.__fillX509Name(self.n2field[nid], value) - if not result['OK']: - return result # WARN: This logic not support list of distribtes name elements resDN = m2.x509_name_oneline(self.__X509Name.x509_name) # pylint: disable=no-member - if userDN and not userDN == resDN: - return S_ERROR('%s not match with generated DN: %s' % (userDN, resDN)) + + result = self.checkStatus(resDN) + if not result['OK']: + return result return S_OK(resDN) def __parseCACFG(self): @@ -252,8 +278,10 @@ def __parseCACFG(self): """ block = '' self.cfg = {} + self.supplied, self.optional, self.match, self.dnList = [], [], [], [] with open(self.parameters['CAConfigFile'], "r") as caCFG: for line in caCFG: + # Ignore comments line = re.sub(r'#.*', '', line) if re.findall(r"\[([A-Za-z0-9_]+)\]", line.replace(' ', '')): block = ''.join(re.findall(r"\[([A-Za-z0-9_]+)\]", line.replace(' ', ''))) @@ -269,6 +297,10 @@ def __parseCACFG(self): for b in self.cfg: if v in self.cfg[b]: val = val.replace('$' + v, self.cfg[b][v]) + if 'default_ca' in self.cfg.get('ca', {}): + if 'policy' in self.cfg.get(self.cfg['ca']['default_ca'], {}): + if block == self.cfg[self.cfg['ca']['default_ca']]['policy']: + self.dnList.append(field) self.cfg[block][field] = val.strip() self.bits = int(self.cfg['req'].get('default_bits') or self.bits) @@ -276,14 +308,17 @@ def __parseCACFG(self): if not self.parameters.get('CertFile'): self.parameters['CertFile'] = self.cfg[self.cfg['ca']['default_ca']]['certificate'] self.parameters['KeyFile'] = self.cfg[self.cfg['ca']['default_ca']]['private_key'] + # Read distinguished names for k, v in self.cfg[self.cfg[self.cfg['ca']['default_ca']]['policy']].items(): - nid = self.fs2nid[k] - if k + '_default' in self.cfg['req']['distinguished_name']: - self.parameters[nid] = self.cfg['req']['distinguished_name'][k + '_default'] - if k + '_min' in self.cfg['req']['distinguished_name']: - self.minDict[nid] = self.cfg['req']['distinguished_name'][k + '_min'] - if k + '_max' in self.cfg['req']['distinguished_name']: - self.maxDict[nid] = self.cfg['req']['distinguished_name'][k + '_max'] + nid = self.fields2nid[k] + self.parameters[nid], self.minDict[nid], self.maxDict[nid] = [], [], [] + for k in ['%s.%s' % (i, k) for i in range(0, 5)] + [k]: + if k + '_default' in self.cfg['req']['distinguished_name']: + self.parameters[nid].append(self.cfg['req']['distinguished_name'][k + '_default']) + if k + '_min' in self.cfg['req']['distinguished_name']: + self.minDict[nid].append(self.cfg['req']['distinguished_name'][k + '_min']) + if k + '_max' in self.cfg['req']['distinguished_name']: + self.maxDict[nid].append(self.cfg['req']['distinguished_name'][k + '_max']) if v == 'supplied': self.supplied.append(nid) elif v == 'optional': @@ -291,27 +326,45 @@ def __parseCACFG(self): elif v == 'match': self.match.append(nid) - def __fillX509Name(self, field, value): + def __parseDN(self, dn): + """ Return DN fields + + :param str dn: DN + + :return: list -- contain tuple with positionOfField.fieldName, fieldNID, fieldValue + """ + dnInfoDict = collections.OrderedDict() + for f, v in [f.split('=') for f in dn.lstrip('/').split('/')]: + if not v: + return S_ERROR('No value set for "%s"' % f) + if f not in dnInfoDict: + dnInfoDict[f] = [v] + else: + dnInfoDict[f].append(v) + return S_OK(dnInfoDict) + + def __fillX509Name(self, field, values): """ Fill x509_Name object by M2Crypto - :param basestring field: DN field name - :param basestring value: value of field + :param str field: DN field name + :param list values: values of field, order important :return: S_OK()/S_ERROR() """ - if value and m2.x509_name_set_by_nid(self.__X509Name.x509_name, # pylint: disable=no-member - self.fs2nid[field], value) == 0: - if not self.__X509Name.add_entry_by_txt(field=field, type=ASN1.MBSTRING_ASC, - entry=value, len=-1, loc=-1, set=0) == 1: - return S_ERROR('Cannot set "%s" field.' % field) + for value in values: + if value and m2.x509_name_set_by_nid(self.__X509Name.x509_name, # pylint: disable=no-member + self.fields2nid[field], value) == 0: + if not self.__X509Name.add_entry_by_txt(field=field, type=ASN1.MBSTRING_ASC, + entry=value, len=-1, loc=-1, set=0) == 1: + return S_ERROR('Cannot set "%s" field.' % field) return S_OK() def __createCertM2Crypto(self): """ Create new certificate for user - :return: S_OK(basestring, basestring)/S_ERROR() + :return: S_OK(tuple)/S_ERROR() -- tuple contain certificate and pulic key as strings """ - # Create publik key + # Create public key userPubKey = EVP.PKey() userPubKey.assign_rsa(RSA.gen_key(self.bits, 65537, util.quiet_genparam_callback)) # Create certificate @@ -347,3 +400,37 @@ def __createCertM2Crypto(self): userCertStr = userCert.as_pem() userPubKeyStr = userPubKey.as_pem(cipher=None, callback=util.no_passphrase_callback) return S_OK((userCertStr, userPubKeyStr)) + + def _forceGenerateProxyForDN(self, dn, time, group=None): + """ An additional helper method for creating a proxy without any substantial validation, + it can be used for a specific case(such as testing) where just need to generate a proxy + with specific DN on the fly. + + :param str dn: requested proxy DN + :param int time: expired time in a seconds + :param str group: if need to add DIRAC group + + :return: S_OK(tuple)/S_ERROR() -- contain proxy as chain and as string + """ + self.__X509Name = X509.X509_Name() + result = self.__parseDN(dn) + if not result['OK']: + return result + dnInfoDict = result['Value'] + + for field, values in dnInfoDict.items(): + result = self.__fillX509Name(field, values) + if not result['OK']: + return result + + result = self.__createCertM2Crypto() + if result['OK']: + certStr, keyStr = result['Value'] + chain = X509Chain() + if chain.loadChainFromString(certStr)['OK'] and chain.loadKeyFromString(keyStr)['OK']: + result = chain.generateProxyToString(time, rfc=True, diracGroup=group) + if not result['OK']: + return result + chain = X509Chain() + chain.loadProxyFromString(result['Value']) + return S_OK((chain, result['Value'])) diff --git a/Resources/ProxyProvider/PUSPProxyProvider.py b/Resources/ProxyProvider/PUSPProxyProvider.py index bb42ad1bcdf..324fc496a2e 100644 --- a/Resources/ProxyProvider/PUSPProxyProvider.py +++ b/Resources/ProxyProvider/PUSPProxyProvider.py @@ -18,49 +18,35 @@ def __init__(self, parameters=None): super(PUSPProxyProvider, self).__init__(parameters) - def getProxy(self, userDict): - """ Generate user proxy + def checkStatus(self, userDN): + """ Read ready to work status of proxy provider - :param dict userDict: user description dictionary with possible fields: - FullName, UserName, DN, Email, DiracGroup + :param str userDN: user DN - :return: S_OK(basestring)/S_ERROR() -- basestring is a proxy string + :return: S_OK()/S_ERROR() """ + # Search a unique identifier(username) to use as cn-label to generate PUSP + if not userDN.split(":")[-1]: + return S_ERROR('Can not found user label for DN: %s' % userDN) + if not self.parameters.get('ServiceURL'): + return S_ERROR('Can not determine PUSP service URL') + + return S_OK() - userDN = userDict.get('DN') - if not userDN: - return S_ERROR('Incomplete user information') + def getProxy(self, userDN): + """ Generate user proxy - diracGroup = userDict.get('DiracGroup') - if not diracGroup: - return S_ERROR('Incomplete user information') + :param str userDN: user DN - result = Registry.getGroupsForDN(userDN) + :return: S_OK(str)/S_ERROR() -- contain a proxy string + """ + result = self.checkStatus(userDN) if not result['OK']: return result - validGroups = result['Value'] - if diracGroup not in validGroups: - return S_ERROR('Invalid group %s for user' % diracGroup) - - voName = Registry.getVOForGroup(diracGroup) - if not voName: - return S_ERROR('Can not determine VO for group %s' % diracGroup) - - csVOMSMapping = Registry.getVOMSAttributeForGroup(diracGroup) - if not csVOMSMapping: - return S_ERROR("No VOMS mapping defined for group %s in the CS" % diracGroup) - vomsAttribute = csVOMSMapping - vomsVO = Registry.getVOMSVOForGroup(diracGroup) - - puspServiceURL = self.parameters.get('ServiceURL') - if not puspServiceURL: - return S_ERROR('Can not determine PUSP service URL for VO %s' % voName) - - user = userDN.split(":")[-1] - - puspURL = "%s?voms=%s:%s&proxy-renewal=false&disable-voms-proxy=false" \ - "&rfc-proxy=true&cn-label=user:%s" % (puspServiceURL, vomsVO, vomsAttribute, user) + puspURL = self.parameters['ServiceURL'] + puspURL += "?proxy-renewal=false&disable-voms-proxy=true&rfc-proxy=true" + puspURL += "&cn-label=user:%s" % userDN.split(":")[-1] try: proxy = urlopen(puspURL).read() @@ -77,24 +63,5 @@ def getProxy(self, userDict): credDict = result['Value'] if credDict['identity'] != userDN: return S_ERROR('Requested DN does not match the obtained one in the PUSP proxy') - timeLeft = credDict['secondsLeft'] - - result = chain.generateProxyToString(lifeTime=timeLeft, - diracGroup=diracGroup) - if not result['OK']: - return result - proxyString = result['Value'] - return S_OK((proxyString, timeLeft)) - - def getUserDN(self, userDict): - """ Get DN of the user certificate that will be created - - :param dict userDict: - :return: S_OK/S_ERROR, Value is the DN string - """ - - userDN = userDict.get('DN') - if not userDN: - return S_ERROR('Incomplete user information') - return S_OK(userDN) + return chain.generateProxyToString(lifeTime=credDict['secondsLeft']) diff --git a/Resources/ProxyProvider/ProxyProvider.py b/Resources/ProxyProvider/ProxyProvider.py index 615683d6483..cd494966818 100644 --- a/Resources/ProxyProvider/ProxyProvider.py +++ b/Resources/ProxyProvider/ProxyProvider.py @@ -1,5 +1,6 @@ """ ProxyProvider base class for various proxy providers """ +from DIRAC import S_OK, S_ERROR __RCSID__ = "$Id$" @@ -11,8 +12,26 @@ def __init__(self, parameters=None): self.parameters = parameters self.name = None if parameters: - self.name = parameters.get('ProxyProviderName') + self.name = parameters.get('ProviderName') def setParameters(self, parameters): self.parameters = parameters - self.name = parameters.get('ProxyProviderName') + self.name = parameters.get('ProviderName') + + def checkStatus(self, userDN): + """ Read ready to work status of proxy provider + + :param str userDN: user DN + + :return: S_OK()/S_ERROR() + """ + return S_OK() + + def generateDN(self, **kwargs): + """ Generate new DN + + :param dict kwargs: user description dictionary + + :return: S_OK(str)/S_ERROR() -- contain DN + """ + return S_ERROR("Not implemented in %s", self.name) diff --git a/Resources/ProxyProvider/test/Test_DIRACCAProxyProvider.py b/Resources/ProxyProvider/test/Test_DIRACCAProxyProvider.py new file mode 100644 index 00000000000..e5fa9806b68 --- /dev/null +++ b/Resources/ProxyProvider/test/Test_DIRACCAProxyProvider.py @@ -0,0 +1,174 @@ +""" This is a test of the DIRACCAProxyProvider +""" + +# pylint: disable=invalid-name,wrong-import-position,protected-access +import os +import re +import sys +import shutil +import commands +import unittest +import tempfile + +from DIRAC import gLogger +from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error +from DIRAC.Resources.ProxyProvider.DIRACCAProxyProvider import DIRACCAProxyProvider + + +thisPath = os.path.dirname(os.path.abspath(__file__)).split('/') +rootPath = thisPath[:len(thisPath) - 3] +certsPath = os.path.join('/'.join(rootPath), 'Core/Security/test/certs') + +testCAPath = os.path.join(tempfile.mkdtemp(dir='/tmp'), 'ca') +testCAConfigFile = os.path.join(testCAPath, 'openssl_config_ca.cnf') + +diracCADict = {'ProviderType': 'DIRACCA', + 'CertFile': os.path.join(certsPath, 'ca/ca.cert.pem'), + 'KeyFile': os.path.join(certsPath, 'ca/ca.key.pem'), + 'Supplied': ['O', 'OU', 'CN'], + 'Optional': ['emailAddress'], + 'DNOrder': ['O', 'OU', 'CN', 'emailAddress'], + 'OU': 'CA', + 'C': 'DN', + 'O': 'DIRACCA', + 'ProviderName': 'DIRAC_CA'} +diracCAConf = {'ProviderType': 'DIRACCA', + 'CAConfigFile': testCAConfigFile, + 'ProviderName': 'DIRAC_CA_CFG'} + + +class DIRACCAProviderTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.failed = False + + shutil.copytree(os.path.join(certsPath, 'ca'), testCAPath) + + # Parse + lines = [] + with open(testCAConfigFile, "r") as caCFG: + for line in caCFG: + if re.findall('=', re.sub(r'#.*', '', line)): + # Ignore comments + field = re.sub(r'#.*', '', line).replace(' ', '').rstrip().split('=')[0] + # Put the right dir + line = 'dir = %s #PUT THE RIGHT DIR HERE!\n' % (testCAPath) if field == 'dir' else line + lines.append(line) + # Write modified conf. file + with open(testCAConfigFile, "w") as caCFG: + caCFG.writelines(lines) + + # Result + status, output = commands.getstatusoutput('ls -al %s' % testCAPath) + if status: + gLogger.error(output) + exit() + gLogger.debug('Test path:\n', output) + + def setUp(self): + gLogger.debug('\n') + if self.failed: + self.fail(self.failed) + + def tearDown(self): + pass + + @classmethod + def tearDownClass(cls): + if os.path.exists(testCAPath): + shutil.rmtree(testCAPath) + + +class testDIRACCAProvider(DIRACCAProviderTestCase): + + def test_getProxy(self): + """ Test 'getProxy' - try to get proxies for different users and check it + """ + def check(proxyStr, proxyProvider, name): + """ Check proxy + + :param str proxyStr: proxy as string + :param str proxyProvider: proxy provider name + :param str name: proxy name + """ + proxyFile = os.path.join(testCAPath, proxyProvider + name.replace(' ', '') + '.pem') + gLogger.info('Check proxy..') + chain = X509Chain() + result = chain.loadProxyFromString(proxyStr) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + for result in [chain.getRemainingSecs(), + chain.getIssuerCert(), + chain.getPKeyObj(), + chain.getCertList(), + chain.getNumCertsInChain(), + chain.generateProxyToString(3600), + chain.generateProxyToFile(proxyFile, 3600), + chain.isProxy(), + chain.isLimitedProxy(), + chain.isValidProxy(), + chain.isVOMS(), + chain.isRFC()]: + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + + for proxyProvider, log in [(diracCADict, 'configuring only in DIRAC CFG'), + (diracCAConf, 'read configuration file')]: + gLogger.info('\n* Try proxy provider that %s..' % log) + ca = DIRACCAProxyProvider() + result = ca.setParameters(proxyProvider) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + + gLogger.info('* Get proxy using FullName and Email of user..') + for name, email, res in [('MrUser', 'good@mail.com', True), + ('MrUser_1', 'good_1@mail.com', True), + (False, 'good@mail.com', False), + ('MrUser', False, True)]: + gLogger.info('\nFullName: %s' % name or 'absent', 'Email: %s..' % email or 'absent') + # Create user DN + result = ca.generateDN(FullName=name, Email=email) + text = 'Must be ended %s%s' % ('successful' if res else 'with error', + ': %s' % result.get('Message', 'Error message is absent.')) + self.assertEqual(result['OK'], res, text) + if not res: + gLogger.info('Msg: %s' % (result['Message'])) + else: + userDN = result['Value'] + gLogger.info('Created DN:', userDN) + + result = ca.getProxy(userDN) + text = 'Must be ended %s%s' % ('successful' if res else 'with error', + ': %s' % result.get('Message', 'Error message is absent.')) + self.assertEqual(result['OK'], res, text) + if not res: + gLogger.info('Msg: %s' % (result['Message'])) + else: + check(result['Value'], proxyProvider['ProviderName'], name) + + gLogger.info('\n* Get proxy using user DN..') + for dn, name, res in [('/O=DIRAC/OU=DIRAC CA/CN=user_3/emailAddress=some@mail.org', 'user_3', True), + ('/O=Dirac/OU=DIRAC CA/CN=user/emailAddress=some@mail.org', 'user', True), + ('/O=Dirac/OU=Without supplied field/emailAddress=some@mail.org', 'not_suplied', False), + ('/O=Dirac/OU=DIRAC CA/CN=without email', 'no_email', True), + ('/some=bad/DN=', 'badDN', False), + ('/BF=Bad Field/O=IN/CN=DN', 'badField', False), + (False, 'absent', False)]: + gLogger.info('\nDN:', dn or 'absent') + try: + result = ca.getProxy(dn) + except Exception as e: + result['Message'] = str(e) + self.assertFalse(res, e) + text = 'Must be ended %s%s' % ('successful' if res else 'with error', + ': %s' % result.get('Message', 'Error message is absent.')) + self.assertEqual(result['OK'], res, text) + if not res: + gLogger.info('Msg: %s' % (result['Message'])) + else: + check(result['Value'], proxyProvider['ProviderName'], name) + + +if __name__ == '__main__': + suite = unittest.defaultTestLoader.loadTestsFromTestCase(DIRACCAProviderTestCase) + suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(testDIRACCAProvider)) + testResult = unittest.TextTestRunner(verbosity=2).run(suite) + sys.exit(not testResult.wasSuccessful()) diff --git a/Resources/ProxyProvider/test/__init__.py b/Resources/ProxyProvider/test/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/Integration/Framework/Test_ProxyDB.py b/tests/Integration/Framework/Test_ProxyDB.py index cfd6ce76047..f501e5c16ca 100644 --- a/tests/Integration/Framework/Test_ProxyDB.py +++ b/tests/Integration/Framework/Test_ProxyDB.py @@ -19,12 +19,12 @@ from DIRAC.Core.Utilities.CFG import CFG from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error from DIRAC.FrameworkSystem.DB.ProxyDB import ProxyDB +from DIRAC.Resources.ProxyProvider.DIRACCAProxyProvider import DIRACCAProxyProvider -# For Jenkins -for f in ['', 'TestCode', os.environ['DIRAC']]: - certsPath = os.path.join(f, 'DIRAC/Core/Security/test/certs') - if os.path.exists(certsPath): - break +certsPath = os.path.join(os.environ['DIRAC'], 'DIRAC/Core/Security/test/certs') +ca = DIRACCAProxyProvider() +ca.setParameters({'CertFile': os.path.join(certsPath, 'ca/ca.cert.pem'), + 'KeyFile': os.path.join(certsPath, 'ca/ca.key.pem')}) diracTestCACFG = """ Resources @@ -38,6 +38,7 @@ KeyFile = %s Supplied = C, O, OU, CN Optional = emailAddress + DNOrder = C, O, OU, CN, emailAddress OU = None C = DN O = DIRACCA @@ -64,12 +65,24 @@ } } } + user + { + DN = /C=CC/O=DN/O=DIRAC/CN=user + DNProperties + { + -C_CC-O_DN-O_DIRAC-CN_user + { + ProxyProviders = + Groups = dirac_user + } + } + } user_1 { - DN = /C=DN/O=DIRAC/CN=user_1 + DN = /C=CC/O=DN/O=DIRAC/CN=user_1 DNProperties { - -C_DN-O_DIRAC-OU_user_1 + -C_CC-O_DN-O_DIRAC-CN_user_1 { ProxyProviders = Groups = dirac_user @@ -78,29 +91,29 @@ } user_2 { - DN = /C=DN/O=DIRAC/CN=user_2 + DN = /C=CC/O=DN/O=DIRAC/CN=user_2 DNProperties { - -C_DN-O_DIRAC-OU_user_2 + -C_CC-O_DN-O_DIRAC-CN_user_2 { } } } user_3 { - DN = /C=DN/O=DIRAC/CN=user_3 + DN = /C=CC/O=DN/O=DIRAC/CN=user_3 } # Not in dirac_user group user_4 { - DN = /C=DN/O=DIRAC/CN=user_4 + DN = /C=CC/O=DN/O=DIRAC/CN=user_4 } } Groups { group_1 { - Users = user_ca, user_1, user_2, user_3 + Users = user_ca, user, user_1, user_2, user_3 VO = vo_1 } group_2 @@ -128,12 +141,20 @@ class ProxyDBTestCase(unittest.TestCase): @classmethod - def createProxy(self, userName, group, time, rfc=True, limit=False, vo=None, role=None, path=None): + def createProxy(self, userName, group, time, vo=None, role=None): """ Create user proxy + + :param str userName: user name + :param str group: group name + :param int time: proxy expired time + :param str vo: VOMS VO name + :param str role: VOMS Role + + :return: S_OK(tuple)/S_ERROR() -- contain proxy as and as string """ userCertFile = os.path.join(self.userDir, userName + '.cert.pem') userKeyFile = os.path.join(self.userDir, userName + '.key.pem') - self.proxyPath = path or os.path.join(self.userDir, userName + '.pem') + self.proxyPath = os.path.join(self.userDir, userName + '.pem') if not vo: chain = X509Chain() # Load user cert and key @@ -147,22 +168,16 @@ def createProxy(self, userName, group, time, rfc=True, limit=False, vo=None, rol if 'bad decrypt' in retVal['Message']: return S_ERROR("Bad passphrase") return S_ERROR("Can't load %s" % userKeyFile) - result = chain.generateProxyToFile(self.proxyPath, time * 3600, - limited=limit, diracGroup=group, - rfc=rfc) + result = chain.generateProxyToFile(self.proxyPath, time * 3600, diracGroup=group) if not result['OK']: return result else: cmd = 'voms-proxy-fake --cert %s --key %s -q' % (userCertFile, userKeyFile) cmd += ' -hostcert %s -hostkey %s' % (self.hostCert, self.hostKey) cmd += ' -uri fakeserver.cern.ch:15000' - cmd += ' -voms "%s%s"' % (vo, role and ':%s' % role or '') + cmd += ' -voms "%s"' % vo cmd += ' -fqan "/%s/Role=%s/Capability=NULL"' % (vo, role) - cmd += ' -hours %s -out %s' % (time, self.proxyPath) - if limit: - cmd += ' -limited' - if rfc: - cmd += ' -rfc' + cmd += ' -hours %s -out %s -rfc' % (time, self.proxyPath) status, output = commands.getstatusoutput(cmd) if status: return S_ERROR(output) @@ -238,16 +253,17 @@ def setUpClass(cls): cls.userDir = tempfile.mkdtemp(dir=certsPath) # Create user certificates - for userName in ['no_user', 'user_1', 'user_2', 'user_3']: + for userName in ['no_user', 'user', 'user_1', 'user_2', 'user_3']: userConf = """[ req ] - default_bits = 2048 + default_bits = 4096 encrypt_key = yes distinguished_name = req_dn prompt = no req_extensions = v3_req [ req_dn ] - C = DN - O = DIRAC + C = CC + O = DN + 0.O = DIRAC CN = %s [ v3_req ] # Extensions for client certificates (`man x509v3_config`). @@ -261,7 +277,7 @@ def setUpClass(cls): userCertFile = os.path.join(cls.userDir, userName + '.cert.pem') with open(userConfFile, "w") as f: f.write(userConf) - status, output = commands.getstatusoutput('openssl genrsa -out %s 2048' % userKeyFile) + status, output = commands.getstatusoutput('openssl genrsa -out %s' % userKeyFile) if status: gLogger.error(output) exit() @@ -281,15 +297,23 @@ def setUpClass(cls): exit() gLogger.debug(output) + # Result + status, output = commands.getstatusoutput('ls -al %s' % cls.userDir) + if status: + gLogger.error(output) + exit() + gLogger.debug('User certificates:\n', output) + def setUp(self): + gLogger.debug('\n') if self.failed: self.fail(self.failed) - db._update('DELETE FROM ProxyDB_Proxies WHERE UserName IN ("user_ca", "user_1", "user_2", "user_3")') - db._update('DELETE FROM ProxyDB_CleanProxies WHERE UserName IN ("user_ca", "user_1", "user_2", "user_3")') + db._update('DELETE FROM ProxyDB_Proxies WHERE UserName IN ("user_ca", "user", "user_1", "user_2", "user_3")') + db._update('DELETE FROM ProxyDB_CleanProxies WHERE UserName IN ("user_ca", "user", "user_1", "user_2", "user_3")') def tearDown(self): - db._update('DELETE FROM ProxyDB_Proxies WHERE UserName IN ("user_ca", "user_1", "user_2", "user_3")') - db._update('DELETE FROM ProxyDB_CleanProxies WHERE UserName IN ("user_ca", "user_1", "user_2", "user_3")') + db._update('DELETE FROM ProxyDB_Proxies WHERE UserName IN ("user_ca", "user", "user_1", "user_2", "user_3")') + db._update('DELETE FROM ProxyDB_CleanProxies WHERE UserName IN ("user_ca", "user", "user_1", "user_2", "user_3")') @classmethod def tearDownClass(cls): @@ -319,44 +343,49 @@ def test_connectDB(self): self.assertTrue(res['OK']) def test_getUsers(self): - """ Try to get users from DB + """ Test 'getUsers' - try to get users from DB """ - field = '("%%s", "/C=DN/O=DIRAC/CN=%%s", %%s "PEM", TIMESTAMPADD(SECOND, %%s, UTC_TIMESTAMP()))%s' % '' + field = '("%%s", "/C=CC/O=DN/O=DIRAC/CN=%%s", %%s "PEM", TIMESTAMPADD(SECOND, %%s, UTC_TIMESTAMP()))%s' % '' # Fill table for test + gLogger.info('\n* Fill tables for test..') for table, values, fields in [('ProxyDB_Proxies', - [field % ('user_1', 'user_1', '"group_1",', '800'), + [field % ('user', 'user', '"group_1",', '800'), field % ('user_2', 'user_2', '"group_1",', '-1')], '(UserName, UserDN, UserGroup, Pem, ExpirationTime)'), ('ProxyDB_CleanProxies', [field % ('user_3', 'user_3', '', '43200')], '(UserName, UserDN, Pem, ExpirationTime)')]: result = db._update('INSERT INTO %s%s VALUES %s ;' % (table, fields, ', '.join(values))) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - for user, exp, expect, log in [(False, 0, ['user_1', 'user_2', 'user_3'], '\n* Without arguments'), + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + # Testing 'getUsers' + gLogger.info('\n* Run `purgeExpiredProxies()`..') + for user, exp, expect, log in [(False, 0, ['user', 'user_2', 'user_3'], '\n* Without arguments'), (False, 1200, ['user_3'], '* Request proxy live time'), ('user_2', 0, ['user_2'], '* Request user name'), ('no_user', 0, [], '* Request not exist user name')]: gLogger.info('%s..' % log) result = db.getUsers(validSecondsLeft=exp, userMask=user) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) usersList = [] for line in result['Value']: - if line['Name'] in ['user_1', 'user_2', 'user_3']: + if line['Name'] in ['user', 'user_2', 'user_3']: usersList.append(line['Name']) - self.assertEqual(set(expect), set(usersList), '%s, when expected %s' % (usersList, expect)) + self.assertEqual(set(expect), set(usersList), str(usersList) + ', when expected ' + str(expect)) def test_purgeExpiredProxies(self): - """ Try to purge expired proxies + """ Test 'purgeExpiredProxies' - try to purge expired proxies """ + # Purge existed proxies + gLogger.info('\n* First cleaning..') cmd = 'INSERT INTO ProxyDB_Proxies(UserName, UserDN, UserGroup, Pem, ExpirationTime) VALUES ' - cmd += '("user_1", "/C=DN/O=DIRAC/CN=user_1", "group_1", "PEM", ' + cmd += '("user", "/C=CC/O=DN/O=DIRAC/CN=user", "group_1", "PEM", ' cmd += 'TIMESTAMPADD(SECOND, -1, UTC_TIMESTAMP()));' result = db._query(cmd) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) cmd = 'SELECT COUNT( * ) FROM ProxyDB_Proxies WHERE ExpirationTime < UTC_TIMESTAMP()' self.assertTrue(bool(db._query(cmd)['Value'][0][0] > 0)) result = db.purgeExpiredProxies() - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) self.assertTrue(result['Value'] > 0, 'Must be more then null') self.assertFalse(bool(db._query(cmd)['Value'][0][0] > 0), "Must be null") @@ -364,105 +393,120 @@ def test_getRemoveProxy(self): """ Testing get, store proxy """ gLogger.info('\n* Check that DB is clean..') - result = db.getProxiesContent({'UserName': ['user_ca', 'user_1', 'user_2', 'user_3']}, {}) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + result = db.getProxiesContent({'UserName': ['user_ca', 'user', 'user_1' 'user_2', 'user_3']}, {}) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) self.assertTrue(bool(int(result['Value']['TotalRecords']) == 0), 'In DB present proxies.') gLogger.info('* Check posible crashes when get proxy..') # Make record with not valid proxy, valid group, user and short expired time cmd = 'INSERT INTO ProxyDB_Proxies(UserName, UserDN, UserGroup, Pem, ExpirationTime) VALUES ' - cmd += '("user_1", "/C=DN/O=DIRAC/CN=user_1", "group_1", "PEM", ' + cmd += '("user", "/C=CC/O=DN/O=DIRAC/CN=user", "group_1", "PEM", ' cmd += 'TIMESTAMPADD(SECOND, 1800, UTC_TIMESTAMP()));' result = db._update(cmd) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) # Try to no correct getProxy requests - for dn, group, reqtime, log in [('/C=DN/O=DIRAC/CN=user_1', 'group_1', 9999, + for dn, group, reqtime, log in [('/C=CC/O=DN/O=DIRAC/CN=user', 'group_1', 9999, 'No proxy provider, set request time, not valid proxy in ProxyDB_Proxies'), - ('/C=DN/O=DIRAC/CN=user_1', 'group_1', 0, + ('/C=CC/O=DN/O=DIRAC/CN=user', 'group_1', 0, 'Not valid proxy in ProxyDB_Proxies'), - ('/C=DN/O=DIRAC/CN=no_user', 'no_valid_group', 0, - 'User no exist, proxy not in DB tables'), - ('/C=DN/O=DIRAC/CN=user_1', 'no_valid_group', 0, + ('/C=CC/O=DN/O=DIRAC/CN=no_user', 'no_valid_group', 0, + 'User not exist, proxy not in DB tables'), + ('/C=CC/O=DN/O=DIRAC/CN=user', 'no_valid_group', 0, 'Group not valid, proxy not in DB tables'), - ('/C=DN/O=DIRAC/CN=user_1', 'group_1', 0, + ('/C=CC/O=DN/O=DIRAC/CN=user', 'group_1', 0, 'No proxy provider for user, proxy not in DB tables'), - ('/C=DN/O=DIRAC/CN=user_4', 'group_2', 0, + ('/C=CC/O=DN/O=DIRAC/CN=user_4', 'group_2', 0, 'Group has option enableToDownload = False in CS')]: gLogger.info('== > %s:' % log) result = db.getProxy(dn, group, reqtime) self.assertFalse(result['OK'], 'Must be fail.') gLogger.info('Msg: %s' % result['Message']) # In the last case method found proxy and must to delete it as not valid - cmd = 'SELECT COUNT( * ) FROM ProxyDB_Proxies WHERE UserName="user_1"' - self.assertTrue(bool(db._query(cmd)['Value'][0][0] == 0), "GetProxy method didn't delete proxy.") + cmd = 'SELECT COUNT( * ) FROM ProxyDB_Proxies WHERE UserName="user"' + self.assertTrue(bool(db._query(cmd)['Value'][0][0] == 0), "GetProxy method didn't delete the last proxy.") gLogger.info('* Check that DB is clean..') - result = db.getProxiesContent({'UserName': ['user_ca', 'user_1', 'user_2', 'user_3']}, {}) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + result = db.getProxiesContent({'UserName': ['user_ca', 'user', 'user_1', 'user_2', 'user_3']}, {}) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) self.assertTrue(bool(int(result['Value']['TotalRecords']) == 0), 'In DB present proxies.') gLogger.info('* Generate proxy on the fly..') result = db.getProxy('/C=DN/O=DIRACCA/OU=None/CN=user_ca/emailAddress=user_ca@diracgrid.org', 'group_1', 1800) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) gLogger.info('* Check that ProxyDB_CleanProxy contain generated proxy..') result = db.getProxiesContent({'UserName': 'user_ca'}, {}) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) self.assertTrue(bool(int(result['Value']['TotalRecords']) == 1), 'Generated proxy must be one.') for table, count in [('ProxyDB_Proxies', 0), ('ProxyDB_CleanProxies', 1)]: cmd = 'SELECT COUNT( * ) FROM %s WHERE UserName="user_ca"' % table self.assertTrue(bool(db._query(cmd)['Value'][0][0] == count), - '%s must %s' % (table, count and 'contain proxy' or 'be empty')) + table + ' must ' + (count and 'contain proxy' or 'be empty')) gLogger.info('* Check that DB is clean..') result = db.deleteProxy('/C=DN/O=DIRACCA/OU=None/CN=user_ca/emailAddress=user_ca@diracgrid.org', proxyProvider='DIRAC_CA') - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - result = db.getProxiesContent({'UserName': ['user_ca', 'user_1', 'user_2', 'user_3']}, {}) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + result = db.getProxiesContent({'UserName': ['user_ca', 'user', 'user_1', 'user_2', 'user_3']}, {}) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) self.assertTrue(bool(int(result['Value']['TotalRecords']) == 0), 'In DB present proxies.') gLogger.info('* Upload proxy..') - for user, dn, group, vo, time, res, log in [("user_1", '/C=DN/O=DIRAC/CN=user_1', "group_1", False, 12, + for user, dn, group, vo, time, res, log in [("user", '/C=CC/O=DN/O=DIRAC/CN=user', "group_1", False, 12, True, 'With group extension'), - ("user_1", '/C=DN/O=DIRAC/CN=user_1', False, "vo_1", 12, + ("user", '/C=CC/O=DN/O=DIRAC/CN=user', False, "vo_1", 12, False, 'With voms extension'), - ("user_1", '/C=DN/O=DIRAC/CN=user_1', False, False, 0, + ("user_1", '/C=CC/O=DN/O=DIRAC/CN=user_1', False, "vo_1", 12, + False, 'With voms extension'), + ("user", '/C=CC/O=DN/O=DIRAC/CN=user', False, False, 0, False, 'Expired proxy'), - ("no_user", '/C=DN/O=DIRAC/CN=no_user', False, False, 12, + ("no_user", '/C=CC/O=DN/O=DIRAC/CN=no_user', False, False, 12, False, 'Not exist user'), - ("user_1", '/C=DN/O=DIRAC/CN=user_1', False, False, 12, + ("user", '/C=CC/O=DN/O=DIRAC/CN=user', False, False, 12, True, 'Valid proxy')]: - result = db._update('DELETE FROM ProxyDB_Proxies WHERE UserName = "user_1"') - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + for table in ['ProxyDB_Proxies', 'ProxyDB_CleanProxies']: + result = db._update('DELETE FROM %s WHERE UserName = "user"' % table) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + result = db._update('DELETE FROM %s WHERE UserName = "user_1"' % table) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) gLogger.info('== > %s:' % log) + result = self.createProxy(user, group, time, vo=vo) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) chain = result['Value'][0] + + # Assert VOMSProxy + if vo: + self.assertTrue(bool(chain.isVOMS().get('Value')), 'Cannot create proxy with VOMS extension') + result = db.generateDelegationRequest(chain, dn) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) resDict = result['Value'] result = chain.generateChainFromRequestString(resDict['request'], time * 3500) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + if not chain.isVOMS().get('Value') and vo: + gLogger.info('voms-proxy-fake command not working as expected, so proxy have no VOMS extention') + res = not res result = db.completeDelegation(resDict['id'], dn, result['Value']) - self.assertEqual(result['OK'], res, 'Must be ended %s%s' % - (res and 'successful' or 'with error', - ': %s' % result.get('Message') or 'Error message is absent.')) + text = 'Must be ended %s%s' % (res and 'successful' or 'with error', + ': %s' % result.get('Message', 'Error message is absent.')) + self.assertEqual(result['OK'], res, text) if not res: gLogger.info('Msg: %s' % (result['Message'])) cmd = 'SELECT COUNT( * ) FROM ProxyDB_Proxies WHERE UserName="%s"' % user self.assertTrue(bool(db._query(cmd)['Value'][0][0] == (res and group and 1) or 0), - 'ProxyDB_Proxies must %s' % (res and 'contain proxy' or 'be empty')) + 'ProxyDB_Proxies must ' + (res and 'contain proxy' or 'be empty')) cmd = 'SELECT COUNT( * ) FROM ProxyDB_CleanProxies WHERE UserName="%s"' % user self.assertTrue(bool(db._query(cmd)['Value'][0][0] == (res and not group and 1) or 0), - 'ProxyDB_CleanProxies must %s' % (res and 'contain proxy' or 'be empty')) + 'ProxyDB_CleanProxies must ' + (res and 'contain proxy' or 'be empty')) gLogger.info('* Check that ProxyDB_CleanProxy contain generated proxy..') - result = db.getProxiesContent({'UserName': 'user_1'}, {}) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + result = db.getProxiesContent({'UserName': 'user'}, {}) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) self.assertTrue(bool(int(result['Value']['TotalRecords']) == 1), 'Generated proxy must be one.') - cmd = 'SELECT COUNT( * ) FROM ProxyDB_CleanProxies WHERE UserName="user_1"' + cmd = 'SELECT COUNT( * ) FROM ProxyDB_CleanProxies WHERE UserName="user"' self.assertTrue(bool(db._query(cmd)['Value'][0][0] == 1), 'ProxyDB_CleanProxies must contain proxy') gLogger.info('* Get proxy that store only in ProxyDB_CleanProxies..') @@ -471,86 +515,96 @@ def test_getRemoveProxy(self): (False, 'group_2', 0, 'Request group not contain user'), (True, 'group_1', 0, 'Request time less that in stored proxy')]: gLogger.info('== > %s:' % log) - result = db.getProxy('/C=DN/O=DIRAC/CN=user_1', group, reqtime) - self.assertEqual(result['OK'], res, 'Must be ended %s%s' % - (res and 'successful' or 'with error', - ': %s' % result.get('Message') or 'Error message is absent.')) + result = db.getProxy('/C=CC/O=DN/O=DIRAC/CN=user', group, reqtime) + text = 'Must be ended %s%s' % (res and 'successful' or 'with error', + ': %s' % result.get('Message', 'Error message is absent.')) + self.assertEqual(result['OK'], res, text) if res: chain = result['Value'][0] - self.assertTrue(chain.isValidProxy()['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(chain.isValidProxy()['OK'], '\n' + result.get('Message', 'Error message is absent.')) result = chain.getDIRACGroup() - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - self.assertEqual('group_1', result['Value'], 'Group must be group_1, not %s' % result['Value']) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + self.assertEqual('group_1', result['Value'], 'Group must be group_1, not ' + result['Value']) else: gLogger.info('Msg: %s' % (result['Message'])) gLogger.info('* Check that DB is clean..') - result = db.deleteProxy('/C=DN/O=DIRAC/CN=user_1', proxyProvider='Certificate') - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - result = db.getProxiesContent({'UserName': ['user_ca', 'user_1', 'user_2', 'user_3']}, {}) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + result = db.deleteProxy('/C=CC/O=DN/O=DIRAC/CN=user', proxyProvider='Certificate') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + result = db.getProxiesContent({'UserName': ['user_ca', 'user', 'user_2', 'user_3']}, {}) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) self.assertTrue(bool(int(result['Value']['TotalRecords']) == 0), 'In DB present proxies.') gLogger.info('* Get proxy when it store only in ProxyDB_Proxies..') # Make record with proxy that contain group - result = self.createProxy('user_1', group, 12) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + result = ca._forceGenerateProxyForDN('/C=CC/O=DN/O=DIRAC/CN=user', 12 * 3600, group='group_1') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) proxyStr = result['Value'][1] cmd = 'INSERT INTO ProxyDB_Proxies(UserName, UserDN, UserGroup, Pem, ExpirationTime) VALUES ' - cmd += '("user_1", "%s", "%s", "%s", TIMESTAMPADD(SECOND, 43200, UTC_TIMESTAMP()))' % (dn, group, + cmd += '("user", "%s", "%s", "%s", TIMESTAMPADD(SECOND, 43200, UTC_TIMESTAMP()))' % (dn, group, proxyStr) result = db._update(cmd) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) # Try to get it result = db.getProxy(dn, group, 1800) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) # Check that proxy contain group chain = result['Value'][0] - self.assertTrue(chain.isValidProxy()['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(chain.isValidProxy()['OK'], '\n' + result.get('Message', 'Error message is absent.')) result = chain.getDIRACGroup() - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - self.assertEqual('group_1', result['Value'], 'Group must be group_1, not %s' % result['Value']) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + self.assertEqual('group_1', result['Value'], 'Group must be group_1, not ' + result['Value']) gLogger.info('* Check that DB is clean..') - result = db.deleteProxy('/C=DN/O=DIRAC/CN=user_1') - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - result = db.getProxiesContent({'UserName': ['user_ca', 'user_1', 'user_2', 'user_3']}, {}) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + result = db.deleteProxy('/C=CC/O=DN/O=DIRAC/CN=user') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + result = db.getProxiesContent({'UserName': ['user_ca', 'user', 'user_1', 'user_2', 'user_3']}, {}) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) self.assertTrue(bool(int(result['Value']['TotalRecords']) == 0), 'In DB present proxies.') gLogger.info('* Get VOMS proxy..') - # Create proxy with VOMS extension - result = self.createProxy('user_1', 'group_1', 12, vo='vo_1', role='role_2') - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + for vomsuser in ['user', 'user_1']: + # Create proxy with VOMS extension + result = self.createProxy(vomsuser, 'group_1', 12, vo='vo_1', role='role_2') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) + chain, proxyStr = result['Value'] + + # Assert VOMSProxy + self.assertTrue(bool(chain.isVOMS().get('Value')), 'Cannot create proxy with VOMS extension') + + cmd = 'INSERT INTO ProxyDB_Proxies(UserName, UserDN, UserGroup, Pem, ExpirationTime) VALUES ' + cmd += '("%s", "/C=CC/O=DN/O=DIRAC/CN=%s", "group_1", "%s", ' % (vomsuser, vomsuser, proxyStr) + cmd += 'TIMESTAMPADD(SECOND, 43200, UTC_TIMESTAMP()))' + result = db._update(cmd) + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) - proxyStr = result['Value'][1] - cmd = 'INSERT INTO ProxyDB_Proxies(UserName, UserDN, UserGroup, Pem, ExpirationTime) VALUES ' - cmd += '("user_1", "/C=DN/O=DIRAC/CN=user_1", "group_1", "%s", ' % proxyStr - cmd += 'TIMESTAMPADD(SECOND, 43200, UTC_TIMESTAMP()))' - result = db._update(cmd) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') # Try to get proxy with VOMS extension - for dn, group, role, time, log in [('/C=DN/O=DIRAC/CN=user_4', 'group_2', False, 9999, + for dn, group, role, time, log in [('/C=CC/O=DN/O=DIRAC/CN=user_4', 'group_2', False, 9999, 'Not exist VO for current group'), - ('/C=DN/O=DIRAC/CN=user_1', 'group_1', 'role_1', 9999, + ('/C=CC/O=DN/O=DIRAC/CN=user', 'group_1', 'role_1', 9999, + 'Stored proxy already have different VOMS extension'), + ('/C=CC/O=DN/O=DIRAC/CN=user_1', 'group_1', 'role_1', 9999, 'Stored proxy already have different VOMS extension'), ('/C=DN/O=DIRACCA/OU=None/CN=user_ca/emailAddress=user_ca@diracgrid.org', 'group_1', 'role_1', 9999, 'Not correct VO configuration')]: - gLogger.info('== > %s:' % log) + gLogger.info('== > %s(DN: %s):' % (log, dn)) + if not any([dn, group, role, time, log]): + gLogger.info('voms-proxy-fake command not working as expected, proxy have no VOMS extention, go to the next..') + continue result = db.getVOMSProxy(dn, group, time, role) self.assertFalse(result['OK'], 'Must be fail.') gLogger.info('Msg: %s' % result['Message']) # Check stored proxies - for table, user, count in [('ProxyDB_Proxies', 'user_1', 1), ('ProxyDB_CleanProxies', 'user_ca', 1)]: + for table, user, count in [('ProxyDB_Proxies', 'user', 1), ('ProxyDB_CleanProxies', 'user_ca', 1)]: cmd = 'SELECT COUNT( * ) FROM %s WHERE UserName="%s"' % (table, user) self.assertTrue(bool(db._query(cmd)['Value'][0][0] == count)) gLogger.info('* Delete proxies..') - for dn, table in [('/C=DN/O=DIRAC/CN=user_1', 'ProxyDB_Proxies'), + for dn, table in [('/C=CC/O=DN/O=DIRAC/CN=user', 'ProxyDB_Proxies'), ('/C=DN/O=DIRACCA/OU=None/CN=user_ca/emailAddress=user_ca@diracgrid.org', 'ProxyDB_CleanProxies')]: result = db.deleteProxy(dn) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + self.assertTrue(result['OK'], '\n' + result.get('Message', 'Error message is absent.')) cmd = 'SELECT COUNT( * ) FROM %s WHERE UserName="user_ca"' % table self.assertTrue(bool(db._query(cmd)['Value'][0][0] == 0)) diff --git a/tests/Integration/Resources/ProxyProvider/Test_DIRACCAProxyProvider.py b/tests/Integration/Resources/ProxyProvider/Test_DIRACCAProxyProvider.py index 0d679ba5a94..af066179249 100644 --- a/tests/Integration/Resources/ProxyProvider/Test_DIRACCAProxyProvider.py +++ b/tests/Integration/Resources/ProxyProvider/Test_DIRACCAProxyProvider.py @@ -12,11 +12,7 @@ from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error from DIRAC.Resources.ProxyProvider.ProxyProviderFactory import ProxyProviderFactory -# For Jenkins -for f in ['', 'TestCode', os.environ['DIRAC']]: - certsPath = os.path.join(f, 'DIRAC/Core/Security/test/certs') - if os.path.exists(certsPath): - break +certsPath = os.path.join(os.environ['DIRAC'], 'DIRAC/Core/Security/test/certs') diracTestCACFG = """ Resources @@ -26,17 +22,19 @@ DIRAC_TEST_CA { ProviderType = DIRACCA - CAConfigFile = %s + CertFile = %s + KeyFile = %s Match = Supplied = C, O, OU, CN Optional = emailAddress + DNOrder = C, O, OU, CN, emailAddress C = FR O = DIRAC OU = DIRAC TEST } } } -""" % os.path.join(certsPath, 'ca/openssl_config_ca.cnf') +""" % (os.path.join(certsPath, 'ca/ca.cert.pem'), os.path.join(certsPath, 'ca/ca.key.pem')) userCFG = """ Registry @@ -69,24 +67,11 @@ class DIRACCAPPTest(unittest.TestCase): @classmethod def setUpClass(cls): - __caPath = os.path.join(certsPath, 'ca') - cls.caConfigFile = os.path.join(__caPath, 'openssl_config_ca.cnf') - - # Save original configuration file - lines = [] - shutil.copyfile(cls.caConfigFile, cls.caConfigFile + 'bak') - with open(cls.caConfigFile, "r") as caCFG: - for line in caCFG: - if re.findall('=', re.sub(r'#.*', '', line)): - field = re.sub(r'#.*', '', line).replace(' ', '').rstrip().split('=')[0] - line = 'dir = %s #PUT THE RIGHT DIR HERE!\n' % (__caPath) if field == 'dir' else line - lines.append(line) - with open(cls.caConfigFile, "w") as caCFG: - caCFG.writelines(lines) + pass @classmethod def tearDownClass(cls): - shutil.move(cls.caConfigFile + 'bak', cls.caConfigFile) + pass def setUp(self): @@ -100,83 +85,44 @@ def setUp(self): self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') self.pp = result['Value'] - self.userDictClean = { - "FullName": "DIRAC test user", - "Email": "testuser@diracgrid.org" - } - self.userDictCleanDN = { - "DN": "/C=FR/O=DIRAC/OU=DIRAC TEST/CN=DIRAC test user/emailAddress=testuser@diracgrid.org", - "Email": "testuser@diracgrid.org" - } - self.userDictGroup = { - "FullName": "DIRAC test user", - "Email": "testuser@diracgrid.org", - "DiracGroup": "dirac_user" - } - def tearDown(self): pass def test_getProxy(self): - - result = self.pp.getProxy(self.userDictClean) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - chain = X509Chain() - chain.loadChainFromString(result['Value']['proxy']) - result = chain.getCredentials() + for dn, res in [('/C=FR/O=DIRAC/OU=DIRAC TEST/CN=DIRAC test user/emailAddress=testuser@diracgrid.org', True), + ('/C=FR/OU=DIRAC TEST/emailAddress=testuser@diracgrid.org', False), + ('/C=FR/OU=DIRAC/O=DIRAC TEST/emailAddress=testuser@diracgrid.org', False), + ('/C=FR/O=DIRAC/BADFIELD=DIRAC TEST/CN=DIRAC test user', False)]: + result = self.pp.getProxy(dn) + text = 'Must be ended %s%s' % ('successful' if res else 'with error', + ': %s' % result.get('Message', 'Error message is absent.')) + self.assertEqual(result['OK'], res, text) + if res: + chain = X509Chain() + chain.loadChainFromString(result['Value']) + result = chain.getCredentials() + self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') + credDict = result['Value'] + self.assertEqual(credDict['username'], 'testuser', + '%s, expected %s' % (credDict['username'], 'testuser')) + + def test_generateProxyDN(self): + + userDict = {"FullName": "John Doe", + "Email": "john.doe@nowhere.net", + "O": 'DIRAC', + 'OU': 'DIRAC TEST', + 'C': 'FR'} + result = self.pp.generateDN(**userDict) self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - credDict = result['Value'] - self.assertEqual(credDict['username'], 'testuser', - '%s, expected %s' % (credDict['username'], 'testuser')) - self.assertEqual(credDict['group'], 'dirac_user', - '%s, expected %s' % (credDict['group'], 'dirac_user')) - - def test_getProxyDN(self): - - result = self.pp.getProxy(self.userDictCleanDN) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - chain = X509Chain() - chain.loadChainFromString(result['Value']['proxy']) - result = chain.getCredentials() - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - credDict = result['Value'] - self.assertEqual(credDict['username'], 'testuser', - '%s, expected %s' % (credDict['username'], 'testuser')) - self.assertEqual(credDict['group'], 'dirac_user', - '%s, expected %s' % (credDict['group'], 'dirac_user')) - - def test_getProxyGroup(self): - - result = self.pp.getProxy(self.userDictGroup) + result = self.pp.getProxy(result['Value']) self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') chain = X509Chain() - chain.loadChainFromString(result['Value']['proxy']) + chain.loadChainFromString(result['Value']) result = chain.getCredentials() self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - credDict = result['Value'] - self.assertEqual(credDict['username'], 'testuser', - '%s, expected %s' % (credDict['username'], 'testuser')) - self.assertEqual(credDict['group'], 'dirac_user', - '%s, expected %s' % (credDict['group'], 'dirac_user')) - - def test_getUserDN(self): - - goodDN = '/C=FR/O=DIRAC/OU=DIRAC TEST/CN=DIRAC test user/emailAddress=testuser@diracgrid.org' - badDN_1 = '/C=FR/OU=DIRAC TEST/CN=DIRAC test user/emailAddress=testuser@diracgrid.org' - badDN_2 = '/C=FR/O=DIRAC/OU=DIRAC TEST/emailAddress=testuser@diracgrid.org' - badDN_3 = '/C=FR/O=DIRAC/OU=DIRAC TEST/CN=DIRAC test user' - result = self.pp.getUserDN(userDN=goodDN) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - result = self.pp.getUserDN(userDN=badDN_1) - self.assertFalse(result['OK'], 'Must be fail.') - result = self.pp.getUserDN(userDN=badDN_2) - self.assertFalse(result['OK'], 'Must be fail.') - result = self.pp.getUserDN(userDN=badDN_3) - self.assertFalse(result['OK'], 'Must be fail.') - userDict = {"FullName": "John Doe", "Email": "john.doe@nowhere.net"} - result = self.pp.getUserDN(userDict) - self.assertTrue(result['OK'], '\n%s' % result.get('Message') or 'Error message is absent.') - self.assertEqual(result['Value'], '/C=FR/O=DIRAC/OU=DIRAC TEST/CN=John Doe/emailAddress=john.doe@nowhere.net') + issuer = result['Value']['issuer'] + self.assertEqual(issuer, '/C=FR/O=DIRAC/OU=DIRAC TEST/CN=John Doe/emailAddress=john.doe@nowhere.net') if __name__ == '__main__':