From 602b1dba59d5fd18c237c2038daa5e345fba45d1 Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Wed, 10 Aug 2016 08:19:25 -0700 Subject: [PATCH 01/13] Enforced standard python formatting --- server/recceiver/cfstore.py | 92 ++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 9f5909fd..1d572aa2 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -19,7 +19,7 @@ # -__all__ = ['CFProcessor'] +__all__ = ['CFProcessor'] class CFProcessor(service.Service): implements(interfaces.IProcessor) @@ -64,8 +64,8 @@ def commit(self, TR): if iocName and hostName and owner: updateChannelFinder(self.client, pvNames, hostName, iocName, time, owner) else: - _log.error('failed to initialize one or more of the following properties'+ - 'hostname: %s iocname: %s owner: %s',hostName,iocName,owner) + _log.error('failed to initialize one or more of the following properties' + + 'hostname: %s iocname: %s owner: %s', hostName, iocName, owner) def updateChannelFinder(client, pvNames, hostName, iocName, time, owner): ''' @@ -77,22 +77,22 @@ def updateChannelFinder(client, pvNames, hostName, iocName, time, owner): e.g. user = abc might create a channel with owner = group-abc time = the time at which these channels are being created/modified ''' - if hostName == None or iocName == None: + if hostName is None or iocName is None: raise Exception('missing hostName or iocName') channels = [] checkPropertiesExist(client, owner) previousChannelsList = client.findByArgs([('hostName', hostName), ('iocName', iocName)]) - if previousChannelsList != None: + if previousChannelsList is not None: for ch in previousChannelsList: - if pvNames != None and ch[u'name'] in pvNames: - channels.append(updateChannel(ch,\ - owner=owner, \ - hostName=hostName, \ - iocName=iocName, \ - pvStatus='Active', \ + if pvNames is not None and ch[u'name'] in pvNames: + channels.append(updateChannel(ch, + owner=owner, + hostName=hostName, + iocName=iocName, + pvStatus='Active', time=time)) pvNames.remove(ch[u'name']) - elif pvNames == None or ch[u'name'] not in pvNames: + elif pvNames is None or ch[u'name'] not in pvNames: '''Orphan the channel : mark as inactive, keep the old hostName and iocName''' oldHostName = hostName oldIocName = iocName @@ -104,31 +104,31 @@ def updateChannelFinder(client, pvNames, hostName, iocName, time, owner): oldIocName = prop[u'value'] if prop[u'name'] == u'time': oldTime = prop[u'value'] - channels.append(updateChannel(ch, \ - owner=owner, \ - hostName=oldHostName, \ - iocName=oldIocName, \ - pvStatus='InActive', \ + channels.append(updateChannel(ch, + owner=owner, + hostName=oldHostName, + iocName=oldIocName, + pvStatus='InActive', time=oldTime)) # now pvNames contains a list of pv's new on this host/ioc for pv in pvNames: - ch = client.findByArgs([('~name',pv)]) + ch = client.findByArgs([('~name', pv)]) if not ch: '''New channel''' - channels.append(createChannel(pv, \ - chOwner=owner, \ - hostName=hostName, \ - iocName=iocName, \ - pvStatus='Active', \ + channels.append(createChannel(pv, + chOwner=owner, + hostName=hostName, + iocName=iocName, + pvStatus='Active', time=time)) elif len(ch) == 1: '''update existing channel: exists but with a different hostName and/or iocName''' - channels.append(updateChannel(ch[0], \ - owner=owner, \ - hostName=hostName, \ - iocName=iocName, \ - pvStatus='Active', \ + channels.append(updateChannel(ch[0], + owner=owner, + hostName=hostName, + iocName=iocName, + pvStatus='Active', time=time)) client.set(channels=channels) @@ -138,35 +138,35 @@ def updateChannel(channel, owner, hostName=None, iocName=None, pvStatus='InActiv ''' # properties list devoid of hostName and iocName properties if channel[u'properties']: - channel[u'properties'] = [property for property in channel[u'properties'] \ - if property[u'name'] != 'hostName' \ - and property[u'name'] != 'iocName'\ + channel[u'properties'] = [property for property in channel[u'properties'] + if property[u'name'] != 'hostName' + and property[u'name'] != 'iocName' and property[u'name'] != 'pvStatus'] else: channel[u'properties'] = [] - if hostName != None: - channel[u'properties'].append({u'name' : 'hostName', u'owner' : owner, u'value': hostName}) - if iocName != None: - channel[u'properties'].append({u'name' : 'iocName', u'owner' : owner, u'value': iocName}) + if hostName is not None: + channel[u'properties'].append({u'name': 'hostName', u'owner': owner, u'value': hostName}) + if iocName is not None: + channel[u'properties'].append({u'name': 'iocName', u'owner': owner, u'value': iocName}) if pvStatus: - channel[u'properties'].append({u'name' : 'pvStatus', u'owner' : owner, u'value': pvStatus}) + channel[u'properties'].append({u'name': 'pvStatus', u'owner': owner, u'value': pvStatus}) if time: - channel[u'properties'].append({u'name' : 'time', u'owner' : owner, u'value': time}) + channel[u'properties'].append({u'name': 'time', u'owner': owner, u'value': time}) return channel def createChannel(chName, chOwner, hostName=None, iocName=None, pvStatus='InActive', time=None): ''' Helper to create a channel object with the required properties ''' - ch = {u'name':chName,u'owner':chOwner,u'properties':[]} - if hostName != None: - ch[u'properties'].append({u'name' : 'hostName', u'owner' : chOwner, u'value': hostName}) - if iocName != None: - ch[u'properties'].append({u'name' : 'iocName', u'owner' : chOwner, u'value': iocName}) + ch = {u'name': chName, u'owner': chOwner, u'properties': []} + if hostName is not None: + ch[u'properties'].append({u'name': 'hostName', u'owner': chOwner, u'value': hostName}) + if iocName is not None: + ch[u'properties'].append({u'name': 'iocName', u'owner': chOwner, u'value': iocName}) if pvStatus: - ch[u'properties'].append({u'name' : 'pvStatus', u'owner' : chOwner, u'value': pvStatus}) + ch[u'properties'].append({u'name': 'pvStatus', u'owner': chOwner, u'value': pvStatus}) if time: - ch[u'properties'].append({u'name' : 'time', u'owner' : chOwner, u'value': time}) + ch[u'properties'].append({u'name': 'time', u'owner': chOwner, u'value': time}) return ch def checkPropertiesExist(client, propOwner): @@ -175,9 +175,9 @@ def checkPropertiesExist(client, propOwner): ''' requiredProperties = ['hostName', 'iocName', 'pvStatus', 'time'] for propName in requiredProperties: - if client.findProperty(propName) == None: + if client.findProperty(propName) is None: try: - client.set(property={u'name': propName, u'owner':propOwner}) + client.set(property={u'name': propName, u'owner': propOwner}) except Exception as e: _log.error('Failed to create the property %s: %s',propName, e) From 14d8cb18824bd7ac52f658c57682eeb9e965bd59 Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Mon, 22 Aug 2016 11:48:40 -0700 Subject: [PATCH 02/13] Improvements to cf-store as defined in issue #11 --- server/recceiver/cfstore.py | 176 +++++++++++++++++++++------------ server/recceiver/interfaces.py | 2 +- 2 files changed, 114 insertions(+), 64 deletions(-) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 1d572aa2..1a93b88c 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -5,6 +5,8 @@ from zope.interface import implements from twisted.application import service +from twisted.internet.threads import deferToThread +import time import interfaces import datetime @@ -15,7 +17,7 @@ # delrec = a set() of records which are being removed # infos = dictionary of client infos # recinfos = additional infos being added to existing records -# "recid: {key:vale}" +# "recid: {key:value}" # @@ -26,9 +28,12 @@ class CFProcessor(service.Service): def __init__(self, name, conf): _log.info("CF_INIT %s", name) - self.name,self.conf = name,conf - - + self.name, self.conf = name, conf + self.channel_dict = dict() + self.iocs = dict() + self.client = None + self.currentTime = getCurrentTime + def startService(self): service.Service.startService(self) self.running = 1 @@ -36,8 +41,9 @@ def startService(self): from channelfinder import ChannelFinderClient # Using the default python cf-client. # The usr, username, and password are provided by the channelfinder._conf module. - self.client = ChannelFinderClient() - + if self.client is None: # For setting up mock test client + self.client = ChannelFinderClient() + def stopService(self): service.Service.stopService(self) #Set channels to inactive and close connection to client @@ -48,71 +54,95 @@ def commit(self, TR): if _log.isEnabledFor(logging.DEBUG): _log.debug("CF_COMMIT %s", TR.infos.items()) pvNames = [unicode(rname, "utf-8") for rid, (rname, rtype) in TR.addrec.iteritems()] - iocName=TR.src.port - hostName=TR.src.host - - ''' - Currently using the hostIP and the iocPort - if 'IOCNAME' in TR.infos: - iocName = TR.infos['IOCNAME'] - if 'HOSTNAME' in TR.infos: - hostName = TR.infos['HOSTNAME'] - ''' + delrec = TR.delrec + iocName = TR.src.port + hostName = TR.src.host + iocid = hostName + str(iocName) owner = TR.infos.get('CF_USERNAME') or TR.infos.get('ENGINEER') or self.conf.get('username', 'cfstore') - time = str(datetime.datetime.now()) - + time = self.currentTime() + if TR.initial: + self.iocs[iocid] = {"iocname": iocName, "hostname": hostName, "channelcount": 0} # add IOC to source list + if TR.connected: + for pv in pvNames: + if pv in self.channel_dict: + if iocid not in self.channel_dict[pv]: + self.channel_dict[pv].append(iocid) # add iocname to pvName in dict + self.iocs[iocid]["channelcount"] += 1 + # else: # info already in dictionary, possibly recovering from ioc disconnect + else: + self.channel_dict[pv] = [iocid] # add pvName with [iocname] in dict + self.iocs[iocid]["channelcount"] += 1 + for pv in delrec: + if iocid in self.channel_dict[pv]: + self.channel_dict[pv].remove(iocid) + self.iocs[iocid]["channelcount"] -= 1 + if len(self.channel_dict[pv]) <= 0: # case: channel has no more iocs + del self.channel_dict[pv] + #pvNames.remove(pv) + else: # CASE: IOC Disconnected + keys = self.channel_dict.keys() + for ch in keys: + if iocid in self.channel_dict[ch]: + self.channel_dict[ch].remove(iocid) + if len(self.channel_dict[ch]) <= 0: # case: channel has no more iocs + del self.channel_dict[ch] + self.iocs[iocid]['channelcount'] -= 1 + # else: + # pass # ch not connected to ioc? + if iocName and hostName and owner: - updateChannelFinder(self.client, pvNames, hostName, iocName, time, owner) + #print "channels_dict:\n", self.channel_dict + return deferToThread(poll, __updateCF__, self.client, pvNames, delrec, self.channel_dict, self.iocs, hostName, iocName, time, owner) else: _log.error('failed to initialize one or more of the following properties' + 'hostname: %s iocname: %s owner: %s', hostName, iocName, owner) - -def updateChannelFinder(client, pvNames, hostName, iocName, time, owner): - ''' - pvNames = list of pvNames - ([] permitted will effectively remove the hostname, iocname from all channels) - hostName = pv hostName (None not permitted) - iocName = pv iocName (None not permitted) - owner = the owner of the channels and properties being added, this can be different from the user - e.g. user = abc might create a channel with owner = group-abc - time = the time at which these channels are being created/modified - ''' + +def __updateCF__(client, new, delrec, channels_dict, iocs, hostName, iocName, time, owner): if hostName is None or iocName is None: raise Exception('missing hostName or iocName') channels = [] checkPropertiesExist(client, owner) - previousChannelsList = client.findByArgs([('hostName', hostName), ('iocName', iocName)]) - if previousChannelsList is not None: - for ch in previousChannelsList: - if pvNames is not None and ch[u'name'] in pvNames: - channels.append(updateChannel(ch, - owner=owner, - hostName=hostName, - iocName=iocName, - pvStatus='Active', - time=time)) - pvNames.remove(ch[u'name']) - elif pvNames is None or ch[u'name'] not in pvNames: - '''Orphan the channel : mark as inactive, keep the old hostName and iocName''' - oldHostName = hostName - oldIocName = iocName - oldTime = time - for prop in ch[u'properties']: - if prop[u'name'] == u'hostName': - oldHostName = prop[u'value'] - if prop[u'name'] == u'iocName': - oldIocName = prop[u'value'] - if prop[u'name'] == u'time': - oldTime = prop[u'value'] - channels.append(updateChannel(ch, - owner=owner, - hostName=oldHostName, - iocName=oldIocName, - pvStatus='InActive', - time=oldTime)) + old = client.findByArgs([('hostName', hostName), ('iocName', iocName)]) + if old is not None: + for ch in old: + if new == [] or ch[u'name'] in delrec: # case: empty commit/del, remove all reference to ioc + if ch[u'name'] in channels_dict: + channels.append(updateChannel(ch, + owner=owner, + hostName=iocs[channels_dict[ch[u'name']][-1]]["hostname"], + iocName=iocs[channels_dict[ch[u'name']][-1]]["iocname"], + pvStatus='Active', + time=time)) + else: + '''Orphan the channel : mark as inactive, keep the old hostName and iocName''' + oldHostName = hostName + oldIocName = iocName + oldTime = time + for prop in ch[u'properties']: + if prop[u'name'] == u'hostName': + oldHostName = prop[u'value'] + if prop[u'name'] == u'iocName': + oldIocName = prop[u'value'] + if prop[u'name'] == u'time': + oldTime = prop[u'value'] + channels.append(updateChannel(ch, + owner=owner, + hostName=oldHostName, + iocName=oldIocName, + pvStatus='Inactive', + time=oldTime)) + else: + if ch in new: # case: channel in old and new + channels.append(updateChannel(ch, + owner=owner, + hostName=iocs[channels_dict[ch[u'name']][-1]]["hostname"], + iocName=iocs[channels_dict[ch[u'name']][-1]]["iocname"], + pvStatus='Active', + time=time)) + new.remove(ch[u'name']) # now pvNames contains a list of pv's new on this host/ioc - for pv in pvNames: + for pv in new: ch = client.findByArgs([('~name', pv)]) if not ch: '''New channel''' @@ -122,7 +152,7 @@ def updateChannelFinder(client, pvNames, hostName, iocName, time, owner): iocName=iocName, pvStatus='Active', time=time)) - elif len(ch) == 1: + else: '''update existing channel: exists but with a different hostName and/or iocName''' channels.append(updateChannel(ch[0], owner=owner, @@ -141,7 +171,8 @@ def updateChannel(channel, owner, hostName=None, iocName=None, pvStatus='InActiv channel[u'properties'] = [property for property in channel[u'properties'] if property[u'name'] != 'hostName' and property[u'name'] != 'iocName' - and property[u'name'] != 'pvStatus'] + and property[u'name'] != 'pvStatus' + and property[u'name'] != 'time'] else: channel[u'properties'] = [] if hostName is not None: @@ -179,5 +210,24 @@ def checkPropertiesExist(client, propOwner): try: client.set(property={u'name': propName, u'owner': propOwner}) except Exception as e: - _log.error('Failed to create the property %s: %s',propName, e) + _log.error('Failed to create the property %s: %s', propName, e) + +def getCurrentTime(): + return str(datetime.datetime.now()) + +def poll(update, client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner): + sleep = 1 + while 1: + try: + update(client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner) + print "-------------------\nTRUE\n-------------------" + return True + except Exception as e: # should catch only network errors + print "SLEEP: ", sleep, "\n-------------------------\n", + print "", channels_dict, "\n-------------------------\n" + time.sleep(sleep) + if sleep >= 60: + sleep = 60 + else: + sleep *= 1.5 diff --git a/server/recceiver/interfaces.py b/server/recceiver/interfaces.py index 7b280e32..81aeff51 100644 --- a/server/recceiver/interfaces.py +++ b/server/recceiver/interfaces.py @@ -9,7 +9,7 @@ class ITransaction(Interface): src = Attribute('Source Address.') addrec = Attribute("""Records being added - recid: ('recname', 'rectype', {'key':'val'}) + {recid: ('recname', 'rectype', {'key':'val'})} """) delrec = Attribute('A set() of recids which are being removed') From 03743aa9d101469c06beaadc203f1e2ffdef7aca Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Mon, 22 Aug 2016 11:55:52 -0700 Subject: [PATCH 03/13] Unit tests for cf-store, including mock channelfinder client. Tests currently focused on IOC disconnections. --- server/recceiver/mock_client.py | 61 ++++++++++++++++++++ server/recceiver/test_cfstore.py | 97 ++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 server/recceiver/mock_client.py create mode 100644 server/recceiver/test_cfstore.py diff --git a/server/recceiver/mock_client.py b/server/recceiver/mock_client.py new file mode 100644 index 00000000..4ab01137 --- /dev/null +++ b/server/recceiver/mock_client.py @@ -0,0 +1,61 @@ +class mock_client(): + def __init__(self): + self.cf = {} + + def findByArgs(self, args): + result = [] + + if len(args) > 1: # returning old + for ch in self.cf: + name_flag = False + prop_flag = False + for props in self.cf[ch][u'properties']: + if props[u'name'] == args[0][0]: + if props[u'value'] == args[0][1]: + name_flag = True + if props[u'name'] == args[1][0]: + if props[u'value'] == args[1][1]: + prop_flag = True + if name_flag and prop_flag: + result.append(self.cf[ch]) + return result + else: + if args[0][1] in self.cf: + return [self.cf[args[0][1]]] + + def findProperty(self, prop_name): + # print "findProperty: ", prop_name + pass + + def set(self, channels): + #print "channels:\n", channels + for channel in channels: + self.addChannel(channel) + #print "CF:\n", self.cf + + def addChannel(self, channel): + self.cf[channel[u'name']] = channel + + +class mock_conf(): + def __init__(self): + pass + + def get(self, name, target): + return "cf-update" + +class mock_src(): + def __init__(self): + self.host = '10.0.2.15' + self.port = 43891 + +class mock_TR(): + def __init__(self): + #self.addrec = {5570560: ('test:lo', 'longout'), 5636096: ('test:Msg-I', 'stringin'), 5701632: ('test:li', 'longin'), 5767168: ('test:State-Sts', 'mbbi')} + #self.addrec = {1: ('name', 'longout')} + self.addrec = {} + self.src = mock_src() + self.delrec = () + self.infos = {'CF_USERNAME': 'cf-update', 'ENGINEER': 'cf-engi'} + self.initial = True + self.connected = True \ No newline at end of file diff --git a/server/recceiver/test_cfstore.py b/server/recceiver/test_cfstore.py new file mode 100644 index 00000000..37f97942 --- /dev/null +++ b/server/recceiver/test_cfstore.py @@ -0,0 +1,97 @@ +from twisted.trial.unittest import TestCase +from twisted.internet import defer +from mock_client import mock_conf +from mock_client import mock_TR +from mock_client import mock_client +from cfstore import CFProcessor + + +class MyTestCase(TestCase): + def setUp(self): + conf = mock_conf() + self.cfstore = CFProcessor("cf", conf) + self.cfstore.currentTime = getTime + self.maxDiff = None + + @defer.inlineCallbacks + def test_3_iocs(self): + cfclient = mock_client() + self.cfstore.client = cfclient + self.cfstore.startService() + TR1 = mock_TR() + TR1.src.host = 'testhosta' + TR1.src.port = 1111 + TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR1) + self.assertCommit([u'ch1', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + + TR2 = mock_TR() + TR2.src.host = 'testhostb' + TR2.src.port = 2222 + TR2.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR2) + self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + TR3 = mock_TR() + TR3.src.host = 'testhostc' + TR3.src.port = 3333 + TR3.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 4: ('ch4', 'stringin'), 6: ('ch6', 'stringin')} + deferred = yield self.cfstore.commit(TR3) + self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + + TRFinal = mock_TR() + TRFinal.initial = False + TRFinal.addrec = {} + TRFinal.connected = False + TRFinal.src.host = 'testhostc' + TRFinal.src.port = 3333 + deferred = yield self.cfstore.commit(TRFinal) + self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + + def assertCommit(self, keys, dict): + self.assertSequenceEqual(self.cfstore.client.cf.keys(), keys) + self.assertDictEqual(self.cfstore.client.cf, dict) + + +def abbr(name, hostname, iocname, status): + return {u'owner': 'cf-update', + u'name': name, + u'properties': [ + {u'owner': 'cf-update', u'name': 'hostName', + u'value': hostname}, + {u'owner': 'cf-update', u'name': 'iocName', + u'value': iocname}, + {u'owner': 'cf-update', u'name': 'pvStatus', + u'value': status}, + {u'owner': 'cf-update', u'name': 'time', + u'value': '2016-08-18 12:33:09.953985'}]} + + +def getTime(): + return '2016-08-18 12:33:09.953985' + +if __name__ == '__main__': + unittest.main() From 6bcc2e9b46bf364890e0522c13a11c337b8a985c Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Tue, 23 Aug 2016 10:51:10 -0700 Subject: [PATCH 04/13] Added new tests for cf-store, including those for channelfinder client disconnections. --- server/recceiver/cfstore.py | 12 ++++--- server/recceiver/mock_client.py | 59 +++++++++++++++++++------------- server/recceiver/test_cfstore.py | 55 ++++++++++++++++++++++++++--- 3 files changed, 93 insertions(+), 33 deletions(-) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 1a93b88c..e7207f88 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -217,14 +217,16 @@ def getCurrentTime(): def poll(update, client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner): sleep = 1 - while 1: + success = False + while not success: try: update(client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner) - print "-------------------\nTRUE\n-------------------" - return True + # print "-------------------\nTRUE\n-------------------" + success = True + return success except Exception as e: # should catch only network errors - print "SLEEP: ", sleep, "\n-------------------------\n", - print "", channels_dict, "\n-------------------------\n" + # print "SLEEP: ", sleep, "\n-------------------------\n", + # print "", channels_dict, "\n-------------------------\n" time.sleep(sleep) if sleep >= 60: sleep = 60 diff --git a/server/recceiver/mock_client.py b/server/recceiver/mock_client.py index 4ab01137..70f6931e 100644 --- a/server/recceiver/mock_client.py +++ b/server/recceiver/mock_client.py @@ -1,37 +1,48 @@ class mock_client(): def __init__(self): self.cf = {} + self.connected = True + self.fail_set = False def findByArgs(self, args): - result = [] - - if len(args) > 1: # returning old - for ch in self.cf: - name_flag = False - prop_flag = False - for props in self.cf[ch][u'properties']: - if props[u'name'] == args[0][0]: - if props[u'value'] == args[0][1]: - name_flag = True - if props[u'name'] == args[1][0]: - if props[u'value'] == args[1][1]: - prop_flag = True - if name_flag and prop_flag: - result.append(self.cf[ch]) - return result + if not self.connected: + raise StandardError("fake 404") else: - if args[0][1] in self.cf: - return [self.cf[args[0][1]]] + result = [] + + if len(args) > 1: # returning old + for ch in self.cf: + name_flag = False + prop_flag = False + for props in self.cf[ch][u'properties']: + if props[u'name'] == args[0][0]: + if props[u'value'] == args[0][1]: + name_flag = True + if props[u'name'] == args[1][0]: + if props[u'value'] == args[1][1]: + prop_flag = True + if name_flag and prop_flag: + result.append(self.cf[ch]) + return result + else: + if args[0][1] in self.cf: + return [self.cf[args[0][1]]] def findProperty(self, prop_name): - # print "findProperty: ", prop_name - pass + if not self.connected: + raise StandardError("fake 404") + else: + # print "findProperty: ", prop_name + pass def set(self, channels): - #print "channels:\n", channels - for channel in channels: - self.addChannel(channel) - #print "CF:\n", self.cf + if not self.connected or self.fail_set: # if not fail_set? + raise StandardError("fake 404") + else: + #print "channels:\n", channels + for channel in channels: + self.addChannel(channel) + #print "CF:\n", self.cf def addChannel(self, channel): self.cf[channel[u'name']] = channel diff --git a/server/recceiver/test_cfstore.py b/server/recceiver/test_cfstore.py index 37f97942..ea880a17 100644 --- a/server/recceiver/test_cfstore.py +++ b/server/recceiver/test_cfstore.py @@ -5,6 +5,9 @@ from mock_client import mock_client from cfstore import CFProcessor +import threading +import time + class MyTestCase(TestCase): def setUp(self): @@ -15,8 +18,9 @@ def setUp(self): @defer.inlineCallbacks def test_3_iocs(self): - cfclient = mock_client() - self.cfstore.client = cfclient + self.cfclient = mock_client() + self.cfclient.connected = True + self.cfstore.client = self.cfclient self.cfstore.startService() TR1 = mock_TR() TR1.src.host = 'testhosta' @@ -41,6 +45,7 @@ def test_3_iocs(self): u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + TR3 = mock_TR() TR3.src.host = 'testhostc' TR3.src.port = 3333 @@ -71,10 +76,52 @@ def test_3_iocs(self): u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + @defer.inlineCallbacks + def test_no_CFS(self): + self.cfclient = mock_client() + self.cfclient.connected = False + self.cfstore.client = self.cfclient + self.cfstore.startService() + TR1 = mock_TR() + TR1.src.host = 'testhosta' + TR1.src.port = 1111 + TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} + threading._start_new_thread(self.simulate_reconnect, ()) + deferred = yield self.cfstore.commit(TR1) + self.assertCommit([u'ch1', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + + @defer.inlineCallbacks + def test_set_fail(self): + self.cfclient = mock_client() + self.cfclient.connected = True + self.cfclient.fail_set = True + self.cfstore.client = self.cfclient + self.cfstore.startService() + TR1 = mock_TR() + TR1.src.host = 'testhosta' + TR1.src.port = 1111 + TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} + threading._start_new_thread(self.simulate_reconnect, ()) + deferred = yield self.cfstore.commit(TR1) + self.assertCommit([u'ch1', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + + def assertCommit(self, keys, dict): self.assertSequenceEqual(self.cfstore.client.cf.keys(), keys) self.assertDictEqual(self.cfstore.client.cf, dict) + def simulate_reconnect(self): + time.sleep(3) + self.cfclient.connected = True + self.cfclient.fail_set = False def abbr(name, hostname, iocname, status): return {u'owner': 'cf-update', @@ -93,5 +140,5 @@ def abbr(name, hostname, iocname, status): def getTime(): return '2016-08-18 12:33:09.953985' -if __name__ == '__main__': - unittest.main() +# if __name__ == '__main__': +# unittest.main() From a07102150e14ec7956387142e26a8a775259f4c7 Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Thu, 25 Aug 2016 10:51:10 -0700 Subject: [PATCH 05/13] Fixed cases where commit attempted to make no changes and would poll indefinitely. Fixed issue where recceiver was ignoring keepalive response and causing a timeout after 30 seconds. Added more cases to commit order tests. --- server/recceiver/cfstore.py | 21 +++++-- server/recceiver/recast.py | 3 + server/recceiver/test_cfstore.py | 104 ++++++++++++++++++++++++++++--- 3 files changed, 115 insertions(+), 13 deletions(-) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index e7207f88..e7c4951c 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -8,6 +8,7 @@ from twisted.internet.threads import deferToThread import time import interfaces +import requests.exceptions import datetime # ITRANSACTION FORMAT: @@ -160,7 +161,11 @@ def __updateCF__(client, new, delrec, channels_dict, iocs, hostName, iocName, ti iocName=iocName, pvStatus='Active', time=time)) - client.set(channels=channels) + if len(channels) != 0: # Fixes a potential server error which occurs when a client.set results in no changes + client.set(channels=channels) + else: + if len(old) != 0: + client.set(channels=channels) def updateChannel(channel, owner, hostName=None, iocName=None, pvStatus='InActive', time=None): ''' @@ -212,24 +217,28 @@ def checkPropertiesExist(client, propOwner): except Exception as e: _log.error('Failed to create the property %s: %s', propName, e) + def getCurrentTime(): return str(datetime.datetime.now()) + def poll(update, client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner): sleep = 1 success = False while not success: try: update(client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner) - # print "-------------------\nTRUE\n-------------------" success = True return success - except Exception as e: # should catch only network errors - # print "SLEEP: ", sleep, "\n-------------------------\n", - # print "", channels_dict, "\n-------------------------\n" - time.sleep(sleep) + #except requests.exceptions.HTTPError as e: # should catch only network errors + except StandardError as e: + _log.debug("error: " + str(e.message)) + _log.debug("SLEEP: " + str(sleep)) + _log.debug(str(channels_dict)) if sleep >= 60: sleep = 60 + time.sleep(sleep) else: + time.sleep(sleep) sleep *= 1.5 diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index d662fe74..c35e83c0 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -87,6 +87,8 @@ def timed(self): self.phase = 2 self.nonce = random.randint(0,0xffffffff) self.writeMsg(0x8002, _ping.pack(self.nonce)) + _log.debug("ping nonce: " + str(self.nonce)) + self.sess.proto.transport.resumeProducing() def getInitialState(self): return (self.recvHeader, 8) @@ -124,6 +126,7 @@ def recvPong(self, body): _log.error('pong nonce does not match! %s!=%s',nonce,self.nonce) self.transport.loseConnection() else: + _log.debug('pong nonce match') self.phase = 1 return self.getInitialState() diff --git a/server/recceiver/test_cfstore.py b/server/recceiver/test_cfstore.py index ea880a17..dea6c201 100644 --- a/server/recceiver/test_cfstore.py +++ b/server/recceiver/test_cfstore.py @@ -60,13 +60,13 @@ def test_3_iocs(self): u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) - TRFinal = mock_TR() - TRFinal.initial = False - TRFinal.addrec = {} - TRFinal.connected = False - TRFinal.src.host = 'testhostc' - TRFinal.src.port = 3333 - deferred = yield self.cfstore.commit(TRFinal) + TR4 = mock_TR() + TR4.initial = False + TR4.addrec = {} + TR4.connected = False + TR4.src.host = 'testhostc' + TR4.src.port = 3333 + deferred = yield self.cfstore.commit(TR4) self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), @@ -76,6 +76,96 @@ def test_3_iocs(self): u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + TR5 = mock_TR() + TR5.src.host = 'testhostc' + TR5.src.port = 3333 + TR5.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 4: ('ch4', 'stringin'), 6: ('ch6', 'stringin')} + deferred = yield self.cfstore.commit(TR5) + self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + + TR6 = mock_TR() + TR6.initial = False + TR6.addrec = {} + TR6.connected = False + TR6.src.host = 'testhostb' + TR6.src.port = 2222 + deferred = yield self.cfstore.commit(TR6) + self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + + TR7 = mock_TR() + TR7.initial = False + TR7.addrec = {} + TR7.connected = False + TR7.src.host = 'testhosta' + TR7.src.port = 1111 + deferred = yield self.cfstore.commit(TR7) + self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Inactive')}) + + TR8 = mock_TR() + TR8.initial = False + TR8.addrec = {} + TR8.connected = False + TR8.src.host = 'testhostc' + TR8.src.port = 3333 + deferred = yield self.cfstore.commit(TR8) + self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Inactive'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Inactive'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Inactive'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Inactive')}) + + TR9 = mock_TR() + TR9.src.host = 'testhostb' + TR9.src.port = 2222 + TR9.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR9) + self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Inactive'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + + TR10 = mock_TR() + TR10.src.host = 'testhosta' + TR10.src.port = 1111 + TR10.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 4: ('ch4', 'stringin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR10) + self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch2': abbr(u'ch2', 'testhosta', 1111, 'Active'), + u'ch3': abbr(u'ch3', 'testhosta', 1111, 'Active'), + u'ch4': abbr(u'ch4', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + @defer.inlineCallbacks def test_no_CFS(self): self.cfclient = mock_client() From 7c90a956ffde66103af628905348c441c8326ce7 Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Tue, 30 Aug 2016 12:18:18 -0700 Subject: [PATCH 06/13] Added locking to cf-store deferred. Added script for outputting channelfinder service data to a file, same format as cf-store data file print. Added script for creating 1000~ identical iocs using the recsync demo st.cmd. --- server/recceiver/cfstore.py | 73 +++++++++++++++++++--- server/recceiver/mock_client.py | 9 ++- server/recceiver/scripts/print_cf_data.py | 36 +++++++++++ server/recceiver/scripts/test_mock_iocs.py | 49 +++++++++++++++ 4 files changed, 158 insertions(+), 9 deletions(-) create mode 100644 server/recceiver/scripts/print_cf_data.py create mode 100644 server/recceiver/scripts/test_mock_iocs.py diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index e7c4951c..7146eaf0 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -6,10 +6,15 @@ from zope.interface import implements from twisted.application import service from twisted.internet.threads import deferToThread +from twisted.internet.defer import DeferredLock +from twisted.internet import defer +from operator import itemgetter import time import interfaces import requests.exceptions import datetime +import os +import json # ITRANSACTION FORMAT: # @@ -34,6 +39,7 @@ def __init__(self, name, conf): self.iocs = dict() self.client = None self.currentTime = getCurrentTime + self.lock = DeferredLock() def startService(self): service.Service.startService(self) @@ -44,14 +50,24 @@ def startService(self): # The usr, username, and password are provided by the channelfinder._conf module. if self.client is None: # For setting up mock test client self.client = ChannelFinderClient() + self.clean_service() def stopService(self): service.Service.stopService(self) #Set channels to inactive and close connection to client self.running = 0 + self.clean_service() _log.info("CF_STOP") - def commit(self, TR): + @defer.inlineCallbacks + def commit(self, transaction_record): + yield self.lock.acquire() + try: + yield deferToThread(self.__commit__, transaction_record) + finally: + self.lock.release() + + def __commit__(self, TR): if _log.isEnabledFor(logging.DEBUG): _log.debug("CF_COMMIT %s", TR.infos.items()) pvNames = [unicode(rname, "utf-8") for rid, (rname, rtype) in TR.addrec.iteritems()] @@ -62,7 +78,7 @@ def commit(self, TR): owner = TR.infos.get('CF_USERNAME') or TR.infos.get('ENGINEER') or self.conf.get('username', 'cfstore') time = self.currentTime() if TR.initial: - self.iocs[iocid] = {"iocname": iocName, "hostname": hostName, "channelcount": 0} # add IOC to source list + self.iocs[iocid] = {"iocname": iocName, "hostname": hostName, "owner": owner, "channelcount": 0} # add IOC to source list if TR.connected: for pv in pvNames: if pv in self.channel_dict: @@ -92,11 +108,41 @@ def commit(self, TR): # pass # ch not connected to ioc? if iocName and hostName and owner: - #print "channels_dict:\n", self.channel_dict - return deferToThread(poll, __updateCF__, self.client, pvNames, delrec, self.channel_dict, self.iocs, hostName, iocName, time, owner) + poll(__updateCF__, self.client, pvNames, delrec, self.channel_dict, self.iocs, hostName, iocName, time, owner) else: _log.error('failed to initialize one or more of the following properties' + 'hostname: %s iocname: %s owner: %s', hostName, iocName, owner) + #unlock in wrapper function + dict_to_file(self.channel_dict, self.iocs) + + def clean_service(self): + while 1: + try: + _log.debug("cleaning...") + channels = self.client.findByArgs([('pvStatus', 'Active')]) + new_channels = [] + for ch in channels: + new_channels.append(clean_channel(ch)) + if len(new_channels) > 0: + self.client.set(channels=new_channels) + return + except StandardError as e: + _log.debug("cleaning failed, retrying: " + str(e.message)) + time.sleep(1) + +def dict_to_file(dict, iocs): + filename = "/home/devuser/recsyncdata" + if os.path.isfile(filename): + os.remove(filename) + list = [] + for key in dict: + list.append([key, iocs[dict[key][-1]]['hostname'], iocs[dict[key][-1]]['iocname']]) + + list.sort(key=itemgetter(0)) + + with open(filename, 'wrx') as f: + json.dump(list, f) + def __updateCF__(client, new, delrec, channels_dict, iocs, hostName, iocName, time, owner): if hostName is None or iocName is None: @@ -109,7 +155,7 @@ def __updateCF__(client, new, delrec, channels_dict, iocs, hostName, iocName, ti if new == [] or ch[u'name'] in delrec: # case: empty commit/del, remove all reference to ioc if ch[u'name'] in channels_dict: channels.append(updateChannel(ch, - owner=owner, + owner=iocs[channels_dict[ch[u'name']][-1]]["owner"], hostName=iocs[channels_dict[ch[u'name']][-1]]["hostname"], iocName=iocs[channels_dict[ch[u'name']][-1]]["iocname"], pvStatus='Active', @@ -135,7 +181,7 @@ def __updateCF__(client, new, delrec, channels_dict, iocs, hostName, iocName, ti else: if ch in new: # case: channel in old and new channels.append(updateChannel(ch, - owner=owner, + owner=iocs[channels_dict[ch[u'name']][-1]]["owner"], hostName=iocs[channels_dict[ch[u'name']][-1]]["hostname"], iocName=iocs[channels_dict[ch[u'name']][-1]]["iocname"], pvStatus='Active', @@ -167,7 +213,7 @@ def __updateCF__(client, new, delrec, channels_dict, iocs, hostName, iocName, ti if len(old) != 0: client.set(channels=channels) -def updateChannel(channel, owner, hostName=None, iocName=None, pvStatus='InActive', time=None): +def updateChannel(channel, owner, hostName=None, iocName=None, pvStatus='Inactive', time=None): ''' Helper to update a channel object so as to not affect the existing properties ''' @@ -190,7 +236,18 @@ def updateChannel(channel, owner, hostName=None, iocName=None, pvStatus='InActiv channel[u'properties'].append({u'name': 'time', u'owner': owner, u'value': time}) return channel -def createChannel(chName, chOwner, hostName=None, iocName=None, pvStatus='InActive', time=None): +def clean_channel(channel): + # properties list devoid of hostName and iocName properties + if channel[u'properties']: + channel[u'properties'] = [property for property in channel[u'properties'] + if property[u'name'] != 'pvStatus'] + else: + channel[u'properties'] = [] + + channel[u'properties'].append({u'name': 'pvStatus', u'owner': channel['owner'], u'value': 'Inactive'}) + return channel + +def createChannel(chName, chOwner, hostName=None, iocName=None, pvStatus='Inactive', time=None): ''' Helper to create a channel object with the required properties ''' diff --git a/server/recceiver/mock_client.py b/server/recceiver/mock_client.py index 70f6931e..927920dd 100644 --- a/server/recceiver/mock_client.py +++ b/server/recceiver/mock_client.py @@ -25,8 +25,15 @@ def findByArgs(self, args): result.append(self.cf[ch]) return result else: - if args[0][1] in self.cf: + if args[0][0] == '~name' and args[0][1] in self.cf: return [self.cf[args[0][1]]] + if args[0][0] == 'pvStatus' and args[0][1] == 'Active': + for ch in self.cf: + for prop in self.cf[ch]['properties']: + if prop['name'] == 'pvStatus': + if prop['value'] == 'Active': + result.append(self.cf[ch]) + return result def findProperty(self, prop_name): if not self.connected: diff --git a/server/recceiver/scripts/print_cf_data.py b/server/recceiver/scripts/print_cf_data.py new file mode 100644 index 00000000..686a6202 --- /dev/null +++ b/server/recceiver/scripts/print_cf_data.py @@ -0,0 +1,36 @@ +from channelfinder import ChannelFinderClient +import json +import os +from operator import itemgetter +filename = "/home/devuser/cfdata" # change this to output file name +client = ChannelFinderClient() + + +def get_cf_data(client): + channels = client.findByArgs([('pvStatus', 'Active')]) + + for ch in channels: + ch.pop('owner', None) + ch.pop('tags', None) + for prop in ch['properties']: + if prop['name'] == 'hostName': + ch['hostName'] = prop['value'] + if prop['name'] == 'iocName': + ch['iocName'] = prop['value'] + ch.pop('properties', None) + return channels + +channels = get_cf_data(client) + +if os.path.isfile(filename): + os.remove(filename) + +new_list = [] + +for channel in channels: + new_list.append([channel['name'], channel['hostName'], int(channel['iocName'])]) + +new_list.sort(key=itemgetter(0)) + +with open(filename, 'wrx') as f: + json.dump(new_list, f) diff --git a/server/recceiver/scripts/test_mock_iocs.py b/server/recceiver/scripts/test_mock_iocs.py new file mode 100644 index 00000000..9062a31b --- /dev/null +++ b/server/recceiver/scripts/test_mock_iocs.py @@ -0,0 +1,49 @@ +import os +import threading +import sys +import time +import signal + +iocexecutable = "st.cmd" + + +def startIOC(): + # conf needs to be set + pid, fd = os.forkpty() + if pid == 0: + os.chdir("/home/devuser/git/skinner/recsync/client/iocBoot/iocdemo") + os.execv("st.cmd", ['']) + return pid, fd + + +def readfd(fd): + while 1: + empt = str(os.read(fd, 16384).strip("\n")) + + +def handler(signum, frame): + global pids + for pid in pids: + os.kill(pid, signal.SIGKILL) + sys.exit() + + +def main(): + global pids + pids = [] + signal.signal(signal.SIGTERM, handler) + os.chdir(os.path.dirname(os.path.abspath(sys.argv[0]))) # Uses a filename, not good, also only works on linux? + for i in range(0, 999): + iocpid, iocfd = startIOC() + pids.append(iocpid) + print "len pids: ", len(pids) + iocthread = threading.Thread(group=None, target=readfd, args=(iocfd,), name="iocthread", kwargs={}) + iocthread.start() + try: + while 1: + time.sleep(1) + except KeyboardInterrupt: + sys.exit() + +if __name__ == '__main__': + main() From d892d303e787055c5915eb3be3582edd44c9089e Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Thu, 1 Sep 2016 10:33:01 -0700 Subject: [PATCH 07/13] 'channelcount' is now used to clean ioc dictionary, writes to error log if it becomes negative. Glassfish server errors are now caught as requests.exceptions.HTTPError instead of StandardError. Mock_client now also uses HTTPError. Mock client uses IPV4Address as its source, rather than an arbitrary object. Recast no longer pauses accepting input, also removed re-initiating listening. test_cfstore cleaned up threading and test commits. --- server/recceiver/cfstore.py | 59 ++++--- server/recceiver/mock_client.py | 15 +- server/recceiver/recast.py | 3 +- server/recceiver/scripts/test_mock_iocs.py | 4 +- server/recceiver/test_cfstore.py | 187 +++++++++++---------- 5 files changed, 139 insertions(+), 129 deletions(-) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 7146eaf0..4ace2fb3 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -2,7 +2,7 @@ import logging _log = logging.getLogger(__name__) - +from requests import HTTPError from zope.interface import implements from twisted.application import service from twisted.internet.threads import deferToThread @@ -11,7 +11,6 @@ from operator import itemgetter import time import interfaces -import requests.exceptions import datetime import os import json @@ -93,6 +92,10 @@ def __commit__(self, TR): if iocid in self.channel_dict[pv]: self.channel_dict[pv].remove(iocid) self.iocs[iocid]["channelcount"] -= 1 + if self.iocs[iocid]['channelcount'] == 0: + self.iocs.pop(iocid, None) + elif self.iocs[iocid]['channelcount'] < 0: + _log.error("channel count negative!") if len(self.channel_dict[pv]) <= 0: # case: channel has no more iocs del self.channel_dict[pv] #pvNames.remove(pv) @@ -104,6 +107,10 @@ def __commit__(self, TR): if len(self.channel_dict[ch]) <= 0: # case: channel has no more iocs del self.channel_dict[ch] self.iocs[iocid]['channelcount'] -= 1 + if self.iocs[iocid]['channelcount'] == 0: + self.iocs.pop(iocid, None) + elif self.iocs[iocid]['channelcount'] < 0: + _log.error("channel count negative!") # else: # pass # ch not connected to ioc? @@ -116,19 +123,27 @@ def __commit__(self, TR): dict_to_file(self.channel_dict, self.iocs) def clean_service(self): + sleep = 1 while 1: try: - _log.debug("cleaning...") + if _log.isEnabledFor(logging.DEBUG): + _log.debug("cleaning...") channels = self.client.findByArgs([('pvStatus', 'Active')]) - new_channels = [] - for ch in channels: - new_channels.append(clean_channel(ch)) - if len(new_channels) > 0: - self.client.set(channels=new_channels) - return - except StandardError as e: - _log.debug("cleaning failed, retrying: " + str(e.message)) - time.sleep(1) + if channels is not None: + #_log.debug("chs: " + str(channels)) + new_channels = [] + for ch in channels or []: + new_channels.append(clean_channel(ch)) + if len(new_channels) > 0: + self.client.set(channels=new_channels) + return + + except HTTPError as e: + _log.error("cleaning failed, retrying: " + str(e.message)) + + finally: + time.sleep(min(60, sleep)) + sleep *= 1.5 def dict_to_file(dict, iocs): filename = "/home/devuser/recsyncdata" @@ -210,7 +225,7 @@ def __updateCF__(client, new, delrec, channels_dict, iocs, hostName, iocName, ti if len(channels) != 0: # Fixes a potential server error which occurs when a client.set results in no changes client.set(channels=channels) else: - if len(old) != 0: + if old and len(old) != 0: client.set(channels=channels) def updateChannel(channel, owner, hostName=None, iocName=None, pvStatus='Inactive', time=None): @@ -287,15 +302,11 @@ def poll(update, client, new, delrec, channels_dict, iocs, hostName, iocName, ti update(client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner) success = True return success - #except requests.exceptions.HTTPError as e: # should catch only network errors - except StandardError as e: - _log.debug("error: " + str(e.message)) - _log.debug("SLEEP: " + str(sleep)) - _log.debug(str(channels_dict)) - if sleep >= 60: - sleep = 60 - time.sleep(sleep) - else: - time.sleep(sleep) - sleep *= 1.5 + except HTTPError as e: # should catch only network errors + if _log.isEnabledFor(logging.DEBUG): + _log.debug("error: " + str(e.message)) + _log.debug("SLEEP: " + str(min(60, sleep))) + _log.debug(str(channels_dict)) + time.sleep(min(60, sleep)) + sleep *= 1.5 diff --git a/server/recceiver/mock_client.py b/server/recceiver/mock_client.py index 927920dd..a46f455e 100644 --- a/server/recceiver/mock_client.py +++ b/server/recceiver/mock_client.py @@ -1,3 +1,5 @@ +from twisted.internet.address import IPv4Address +from requests import HTTPError class mock_client(): def __init__(self): self.cf = {} @@ -6,7 +8,7 @@ def __init__(self): def findByArgs(self, args): if not self.connected: - raise StandardError("fake 404") + raise HTTPError("Mock ChannelfinderClient HTTPError", response=self) else: result = [] @@ -37,14 +39,14 @@ def findByArgs(self, args): def findProperty(self, prop_name): if not self.connected: - raise StandardError("fake 404") + raise HTTPError("Mock ChannelfinderClient HTTPError", response=self) else: # print "findProperty: ", prop_name pass def set(self, channels): if not self.connected or self.fail_set: # if not fail_set? - raise StandardError("fake 404") + raise HTTPError("Mock ChannelfinderClient HTTPError", response=self) else: #print "channels:\n", channels for channel in channels: @@ -62,17 +64,12 @@ def __init__(self): def get(self, name, target): return "cf-update" -class mock_src(): - def __init__(self): - self.host = '10.0.2.15' - self.port = 43891 - class mock_TR(): def __init__(self): #self.addrec = {5570560: ('test:lo', 'longout'), 5636096: ('test:Msg-I', 'stringin'), 5701632: ('test:li', 'longin'), 5767168: ('test:State-Sts', 'mbbi')} #self.addrec = {1: ('name', 'longout')} self.addrec = {} - self.src = mock_src() + self.src = IPv4Address('TCP', 'testhosta', 1111) self.delrec = () self.infos = {'CF_USERNAME': 'cf-update', 'ENGINEER': 'cf-engi'} self.initial = True diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index c35e83c0..1a9dd01a 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -88,7 +88,6 @@ def timed(self): self.nonce = random.randint(0,0xffffffff) self.writeMsg(0x8002, _ping.pack(self.nonce)) _log.debug("ping nonce: " + str(self.nonce)) - self.sess.proto.transport.resumeProducing() def getInitialState(self): return (self.recvHeader, 8) @@ -244,7 +243,7 @@ def flush(self, connected=True): op = self.factory.commit(TR) if op: op.addCallbacks(self.resume, self.abort) - self.proto.transport.pauseProducing() + #self.proto.transport.pauseProducing() self.op = op def resume(self, arg): diff --git a/server/recceiver/scripts/test_mock_iocs.py b/server/recceiver/scripts/test_mock_iocs.py index 9062a31b..e33bf08c 100644 --- a/server/recceiver/scripts/test_mock_iocs.py +++ b/server/recceiver/scripts/test_mock_iocs.py @@ -33,11 +33,13 @@ def main(): pids = [] signal.signal(signal.SIGTERM, handler) os.chdir(os.path.dirname(os.path.abspath(sys.argv[0]))) # Uses a filename, not good, also only works on linux? - for i in range(0, 999): + threads = [] + for i in range(0, 899): iocpid, iocfd = startIOC() pids.append(iocpid) print "len pids: ", len(pids) iocthread = threading.Thread(group=None, target=readfd, args=(iocfd,), name="iocthread", kwargs={}) + threads.append(iocthread) iocthread.start() try: while 1: diff --git a/server/recceiver/test_cfstore.py b/server/recceiver/test_cfstore.py index dea6c201..c3eccc19 100644 --- a/server/recceiver/test_cfstore.py +++ b/server/recceiver/test_cfstore.py @@ -27,38 +27,38 @@ def test_3_iocs(self): TR1.src.port = 1111 TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} deferred = yield self.cfstore.commit(TR1) - self.assertCommit([u'ch1', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) TR2 = mock_TR() TR2.src.host = 'testhostb' TR2.src.port = 2222 TR2.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 7: ('ch7', 'longout')} deferred = yield self.cfstore.commit(TR2) - self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), - u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), - u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) TR3 = mock_TR() TR3.src.host = 'testhostc' TR3.src.port = 3333 TR3.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 4: ('ch4', 'stringin'), 6: ('ch6', 'stringin')} deferred = yield self.cfstore.commit(TR3) - self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), - u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), - u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), - u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), - u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) TR4 = mock_TR() TR4.initial = False @@ -67,28 +67,28 @@ def test_3_iocs(self): TR4.src.host = 'testhostc' TR4.src.port = 3333 deferred = yield self.cfstore.commit(TR4) - self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), - u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), - u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), - u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) TR5 = mock_TR() TR5.src.host = 'testhostc' TR5.src.port = 3333 TR5.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 4: ('ch4', 'stringin'), 6: ('ch6', 'stringin')} deferred = yield self.cfstore.commit(TR5) - self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), - u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), - u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), - u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), - u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) TR6 = mock_TR() TR6.initial = False @@ -97,14 +97,14 @@ def test_3_iocs(self): TR6.src.host = 'testhostb' TR6.src.port = 2222 deferred = yield self.cfstore.commit(TR6) - self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), - u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), - u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), - u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), - u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) TR7 = mock_TR() TR7.initial = False @@ -113,14 +113,14 @@ def test_3_iocs(self): TR7.src.host = 'testhosta' TR7.src.port = 1111 deferred = yield self.cfstore.commit(TR7) - self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), - u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), - u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), - u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), - u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), - u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Inactive')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Inactive')}) TR8 = mock_TR() TR8.initial = False @@ -129,42 +129,42 @@ def test_3_iocs(self): TR8.src.host = 'testhostc' TR8.src.port = 3333 deferred = yield self.cfstore.commit(TR8) - self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Inactive'), - u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Inactive'), - u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), - u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), - u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Inactive'), - u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Inactive')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Inactive'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Inactive'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Inactive'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Inactive')}) TR9 = mock_TR() TR9.src.host = 'testhostb' TR9.src.port = 2222 TR9.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 7: ('ch7', 'longout')} deferred = yield self.cfstore.commit(TR9) - self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), - u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), - u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), - u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), - u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Inactive'), - u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Inactive'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) TR10 = mock_TR() TR10.src.host = 'testhosta' TR10.src.port = 1111 TR10.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 4: ('ch4', 'stringin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} deferred = yield self.cfstore.commit(TR10) - self.assertCommit([u'ch1', u'ch2', u'ch3', u'ch4', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), - u'ch2': abbr(u'ch2', 'testhosta', 1111, 'Active'), - u'ch3': abbr(u'ch3', 'testhosta', 1111, 'Active'), - u'ch4': abbr(u'ch4', 'testhosta', 1111, 'Active'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch2': abbr(u'ch2', 'testhosta', 1111, 'Active'), + u'ch3': abbr(u'ch3', 'testhosta', 1111, 'Active'), + u'ch4': abbr(u'ch4', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) @defer.inlineCallbacks def test_no_CFS(self): @@ -176,13 +176,14 @@ def test_no_CFS(self): TR1.src.host = 'testhosta' TR1.src.port = 1111 TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} - threading._start_new_thread(self.simulate_reconnect, ()) + rcon_thread = threading.Thread(target=self.simulate_reconnect, args=()) + rcon_thread.start() deferred = yield self.cfstore.commit(TR1) - self.assertCommit([u'ch1', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) @defer.inlineCallbacks def test_set_fail(self): @@ -195,23 +196,23 @@ def test_set_fail(self): TR1.src.host = 'testhosta' TR1.src.port = 1111 TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} - threading._start_new_thread(self.simulate_reconnect, ()) + rcon_thread = threading.Thread(target=self.simulate_reconnect, args=()) + rcon_thread.start() deferred = yield self.cfstore.commit(TR1) - self.assertCommit([u'ch1', u'ch5', u'ch6', u'ch7'], - {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), - u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) - - def assertCommit(self, keys, dict): - self.assertSequenceEqual(self.cfstore.client.cf.keys(), keys) + def assertCommit(self, dict): self.assertDictEqual(self.cfstore.client.cf, dict) def simulate_reconnect(self): time.sleep(3) - self.cfclient.connected = True - self.cfclient.fail_set = False + self.cfclient.connected = True # There is the potential for a race condition here + self.cfclient.fail_set = False # This would cause a connection failure and re-polling at a different point + def abbr(name, hostname, iocname, status): return {u'owner': 'cf-update', From 8bbc7ab58536c48dd74d53ab9db9ac799363a145 Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Wed, 7 Sep 2016 08:00:38 -0700 Subject: [PATCH 08/13] Added cleaning methods to mock client, added tests for clean_service to unit tests. Changed format of IOCID to match possible new system. Updated owner information. Clean service now uses the update method instead of the set methods. Note: This overwrites ownership in some locations. --- server/recceiver/cfstore.py | 14 +++++----- server/recceiver/mock_client.py | 30 +++++++++++++++++---- server/recceiver/test_cfstore.py | 46 +++++++++++++++++++++++++------- 3 files changed, 69 insertions(+), 21 deletions(-) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 4ace2fb3..3c16da66 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -49,7 +49,7 @@ def startService(self): # The usr, username, and password are provided by the channelfinder._conf module. if self.client is None: # For setting up mock test client self.client = ChannelFinderClient() - self.clean_service() + self.clean_service() def stopService(self): service.Service.stopService(self) @@ -73,7 +73,7 @@ def __commit__(self, TR): delrec = TR.delrec iocName = TR.src.port hostName = TR.src.host - iocid = hostName + str(iocName) + iocid = hostName + ":" + str(iocName) owner = TR.infos.get('CF_USERNAME') or TR.infos.get('ENGINEER') or self.conf.get('username', 'cfstore') time = self.currentTime() if TR.initial: @@ -120,10 +120,11 @@ def __commit__(self, TR): _log.error('failed to initialize one or more of the following properties' + 'hostname: %s iocname: %s owner: %s', hostName, iocName, owner) #unlock in wrapper function - dict_to_file(self.channel_dict, self.iocs) + #dict_to_file(self.channel_dict, self.iocs) def clean_service(self): sleep = 1 + owner = self.conf.get('username', 'cfstore') while 1: try: if _log.isEnabledFor(logging.DEBUG): @@ -133,9 +134,10 @@ def clean_service(self): #_log.debug("chs: " + str(channels)) new_channels = [] for ch in channels or []: - new_channels.append(clean_channel(ch)) + new_channels.append(ch[u'name']) if len(new_channels) > 0: - self.client.set(channels=new_channels) + self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, + channelNames=new_channels) return except HTTPError as e: @@ -146,7 +148,7 @@ def clean_service(self): sleep *= 1.5 def dict_to_file(dict, iocs): - filename = "/home/devuser/recsyncdata" + filename = "/home/devuser/recsyncdata" # TODO: change if os.path.isfile(filename): os.remove(filename) list = [] diff --git a/server/recceiver/mock_client.py b/server/recceiver/mock_client.py index a46f455e..55d97584 100644 --- a/server/recceiver/mock_client.py +++ b/server/recceiver/mock_client.py @@ -4,11 +4,12 @@ class mock_client(): def __init__(self): self.cf = {} self.connected = True + self.fail_find = False self.fail_set = False def findByArgs(self, args): if not self.connected: - raise HTTPError("Mock ChannelfinderClient HTTPError", response=self) + raise HTTPError("Mock Channelfinder Client HTTPError", response=self) else: result = [] @@ -39,23 +40,40 @@ def findByArgs(self, args): def findProperty(self, prop_name): if not self.connected: - raise HTTPError("Mock ChannelfinderClient HTTPError", response=self) + raise HTTPError("Mock Channelfinder Client HTTPError", response=self) else: # print "findProperty: ", prop_name pass def set(self, channels): - if not self.connected or self.fail_set: # if not fail_set? - raise HTTPError("Mock ChannelfinderClient HTTPError", response=self) + if not self.connected or self.fail_set: + raise HTTPError("Mock Channelfinder Client HTTPError", response=self) else: #print "channels:\n", channels for channel in channels: self.addChannel(channel) #print "CF:\n", self.cf + def update(self, property, channelNames): + print "update" + if not self.connected or self.fail_find: + raise HTTPError("Mock Channelfinder Client HTTPError", response=self) + else: + for channel in channelNames: + self.__updateChannelWithProp(property, channel) + def addChannel(self, channel): self.cf[channel[u'name']] = channel + def __updateChannelWithProp(self, property, channel): + print "p: ", property + print "c: ", channel + if channel in self.cf: + for prop in self.cf[channel]['properties']: + if prop['name'] == property['name']: + prop['value'] = property['value'] + prop['owner'] = property['owner'] # also overwriting owner because that's what CF does + return class mock_conf(): def __init__(self): @@ -73,4 +91,6 @@ def __init__(self): self.delrec = () self.infos = {'CF_USERNAME': 'cf-update', 'ENGINEER': 'cf-engi'} self.initial = True - self.connected = True \ No newline at end of file + self.connected = True + self.fail_set = False + self.fail_find = False \ No newline at end of file diff --git a/server/recceiver/test_cfstore.py b/server/recceiver/test_cfstore.py index c3eccc19..2479cc8b 100644 --- a/server/recceiver/test_cfstore.py +++ b/server/recceiver/test_cfstore.py @@ -19,7 +19,6 @@ def setUp(self): @defer.inlineCallbacks def test_3_iocs(self): self.cfclient = mock_client() - self.cfclient.connected = True self.cfstore.client = self.cfclient self.cfstore.startService() TR1 = mock_TR() @@ -63,7 +62,7 @@ def test_3_iocs(self): TR4 = mock_TR() TR4.initial = False TR4.addrec = {} - TR4.connected = False + TR4.connected = False # simulated IOC Disconnect TR4.src.host = 'testhostc' TR4.src.port = 3333 deferred = yield self.cfstore.commit(TR4) @@ -93,7 +92,7 @@ def test_3_iocs(self): TR6 = mock_TR() TR6.initial = False TR6.addrec = {} - TR6.connected = False + TR6.connected = False # simulated IOC Disconnect TR6.src.host = 'testhostb' TR6.src.port = 2222 deferred = yield self.cfstore.commit(TR6) @@ -109,7 +108,7 @@ def test_3_iocs(self): TR7 = mock_TR() TR7.initial = False TR7.addrec = {} - TR7.connected = False + TR7.connected = False # simulated IOC Disconnect TR7.src.host = 'testhosta' TR7.src.port = 1111 deferred = yield self.cfstore.commit(TR7) @@ -125,7 +124,7 @@ def test_3_iocs(self): TR8 = mock_TR() TR8.initial = False TR8.addrec = {} - TR8.connected = False + TR8.connected = False # simulated IOC Disconnect TR8.src.host = 'testhostc' TR8.src.port = 3333 deferred = yield self.cfstore.commit(TR8) @@ -171,13 +170,13 @@ def test_no_CFS(self): self.cfclient = mock_client() self.cfclient.connected = False self.cfstore.client = self.cfclient + rcon_thread = threading.Timer(2, self.simulate_reconnect) + rcon_thread.start() self.cfstore.startService() TR1 = mock_TR() TR1.src.host = 'testhosta' TR1.src.port = 1111 TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} - rcon_thread = threading.Thread(target=self.simulate_reconnect, args=()) - rcon_thread.start() deferred = yield self.cfstore.commit(TR1) self.assertDictEqual(self.cfstore.client.cf, {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), @@ -188,7 +187,6 @@ def test_no_CFS(self): @defer.inlineCallbacks def test_set_fail(self): self.cfclient = mock_client() - self.cfclient.connected = True self.cfclient.fail_set = True self.cfstore.client = self.cfclient self.cfstore.startService() @@ -196,7 +194,7 @@ def test_set_fail(self): TR1.src.host = 'testhosta' TR1.src.port = 1111 TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} - rcon_thread = threading.Thread(target=self.simulate_reconnect, args=()) + rcon_thread = threading.Timer(2, self.simulate_reconnect) rcon_thread.start() deferred = yield self.cfstore.commit(TR1) self.assertDictEqual(self.cfstore.client.cf, @@ -205,13 +203,41 @@ def test_set_fail(self): u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + @defer.inlineCallbacks + def test_clean_service(self): + self.cfclient = mock_client() + self.cfclient.fail_find = True + self.cfclient.cf = {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch5': abbr(u'ch5', 'testhostb', 2222, 'Active'), + u'ch6': abbr(u'ch6', 'testhostb', 2222, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')} + self.cfstore.client = self.cfclient + rcon_thread = threading.Timer(2, self.simulate_reconnect) + rcon_thread.start() + self.cfstore.startService() + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhostb', 2222, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostb', 2222, 'Inactive'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Inactive')}) + TR1 = mock_TR() + TR1.src.host = 'testhosta' + TR1.src.port = 1111 + TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR1) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + def assertCommit(self, dict): self.assertDictEqual(self.cfstore.client.cf, dict) def simulate_reconnect(self): - time.sleep(3) self.cfclient.connected = True # There is the potential for a race condition here self.cfclient.fail_set = False # This would cause a connection failure and re-polling at a different point + self.cfclient.fail_find = False def abbr(name, hostname, iocname, status): From 3120e1693848321ea32ca9b1ff438530e2a97c8c Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Thu, 8 Sep 2016 12:05:58 -0700 Subject: [PATCH 09/13] Added readme to describe and monitor the status of manual tests. Fixed local path in test_mock_iocs and decreased the number of iocs spawned by default. Cleaned up the dictionary logic in the commit method. Added config control to dict_to_file. --- server/recceiver/cfstore.py | 94 ++++++++++------------ server/recceiver/scripts/readme | 74 +++++++++++++++++ server/recceiver/scripts/test_mock_iocs.py | 7 +- 3 files changed, 121 insertions(+), 54 deletions(-) create mode 100644 server/recceiver/scripts/readme diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 3c16da66..f04e3894 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -70,7 +70,7 @@ def __commit__(self, TR): if _log.isEnabledFor(logging.DEBUG): _log.debug("CF_COMMIT %s", TR.infos.items()) pvNames = [unicode(rname, "utf-8") for rid, (rname, rtype) in TR.addrec.iteritems()] - delrec = TR.delrec + delrec = list(TR.delrec) or [] iocName = TR.src.port hostName = TR.src.host iocid = hostName + ":" + str(iocName) @@ -78,41 +78,30 @@ def __commit__(self, TR): time = self.currentTime() if TR.initial: self.iocs[iocid] = {"iocname": iocName, "hostname": hostName, "owner": owner, "channelcount": 0} # add IOC to source list - if TR.connected: - for pv in pvNames: - if pv in self.channel_dict: - if iocid not in self.channel_dict[pv]: - self.channel_dict[pv].append(iocid) # add iocname to pvName in dict - self.iocs[iocid]["channelcount"] += 1 - # else: # info already in dictionary, possibly recovering from ioc disconnect - else: - self.channel_dict[pv] = [iocid] # add pvName with [iocname] in dict + if not TR.connected: + if delrec: + delrec = delrec.append(self.channel_dict.keys()) + else: + delrec = self.channel_dict.keys() + for pv in pvNames: + if pv in self.channel_dict: + if iocid not in self.channel_dict[pv]: + self.channel_dict[pv].append(iocid) # add iocname to pvName in dict self.iocs[iocid]["channelcount"] += 1 - for pv in delrec: - if iocid in self.channel_dict[pv]: - self.channel_dict[pv].remove(iocid) - self.iocs[iocid]["channelcount"] -= 1 - if self.iocs[iocid]['channelcount'] == 0: - self.iocs.pop(iocid, None) - elif self.iocs[iocid]['channelcount'] < 0: - _log.error("channel count negative!") - if len(self.channel_dict[pv]) <= 0: # case: channel has no more iocs - del self.channel_dict[pv] - #pvNames.remove(pv) - else: # CASE: IOC Disconnected - keys = self.channel_dict.keys() - for ch in keys: - if iocid in self.channel_dict[ch]: - self.channel_dict[ch].remove(iocid) - if len(self.channel_dict[ch]) <= 0: # case: channel has no more iocs - del self.channel_dict[ch] - self.iocs[iocid]['channelcount'] -= 1 - if self.iocs[iocid]['channelcount'] == 0: - self.iocs.pop(iocid, None) - elif self.iocs[iocid]['channelcount'] < 0: - _log.error("channel count negative!") - # else: - # pass # ch not connected to ioc? + # else: # info already in dictionary, possibly recovering from ioc disconnect + else: + self.channel_dict[pv] = [iocid] # add pvName with [iocname] in dict + self.iocs[iocid]["channelcount"] += 1 + for pv in delrec: + if iocid in self.channel_dict[pv]: + self.channel_dict[pv].remove(iocid) + self.iocs[iocid]["channelcount"] -= 1 + if self.iocs[iocid]['channelcount'] == 0: + self.iocs.pop(iocid, None) + elif self.iocs[iocid]['channelcount'] < 0: + _log.error("channel count negative!") + if len(self.channel_dict[pv]) <= 0: # case: channel has no more iocs + del self.channel_dict[pv] if iocName and hostName and owner: poll(__updateCF__, self.client, pvNames, delrec, self.channel_dict, self.iocs, hostName, iocName, time, owner) @@ -120,10 +109,11 @@ def __commit__(self, TR): _log.error('failed to initialize one or more of the following properties' + 'hostname: %s iocname: %s owner: %s', hostName, iocName, owner) #unlock in wrapper function - #dict_to_file(self.channel_dict, self.iocs) + dict_to_file(self.channel_dict, self.iocs, self.conf) def clean_service(self): sleep = 1 + retry_limit = 5 owner = self.conf.get('username', 'cfstore') while 1: try: @@ -139,26 +129,28 @@ def clean_service(self): self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, channelNames=new_channels) return + except HTTPError: + _log.exception("cleaning failed, retrying: ") - except HTTPError as e: - _log.error("cleaning failed, retrying: " + str(e.message)) + time.sleep(min(60, sleep)) + sleep *= 1.5 + if self.running == 0 and sleep >= retry_limit: + return - finally: - time.sleep(min(60, sleep)) - sleep *= 1.5 -def dict_to_file(dict, iocs): - filename = "/home/devuser/recsyncdata" # TODO: change - if os.path.isfile(filename): - os.remove(filename) - list = [] - for key in dict: - list.append([key, iocs[dict[key][-1]]['hostname'], iocs[dict[key][-1]]['iocname']]) +def dict_to_file(dict, iocs, conf): + filename = conf.get('debug_file_loc', None) + if filename: + if os.path.isfile(filename): + os.remove(filename) + list = [] + for key in dict: + list.append([key, iocs[dict[key][-1]]['hostname'], iocs[dict[key][-1]]['iocname']]) - list.sort(key=itemgetter(0)) + list.sort(key=itemgetter(0)) - with open(filename, 'wrx') as f: - json.dump(list, f) + with open(filename, 'wrx') as f: + json.dump(list, f) def __updateCF__(client, new, delrec, channels_dict, iocs, hostName, iocName, time, owner): diff --git a/server/recceiver/scripts/readme b/server/recceiver/scripts/readme new file mode 100644 index 00000000..a94a4b14 --- /dev/null +++ b/server/recceiver/scripts/readme @@ -0,0 +1,74 @@ +MANUAL TEST LOG: + +PASSED - Clean and poll on recsync start +PASSED - Clean and poll on recsync close +PASSED - Clean and poll on IOC start +PASSED - Clean and poll on IOC close +PASSED - 100 IOCs overlapping 1 IOC + +Clean and poll on recsync start: +0.a CF on +0.b add_extra_properties + - should see the new props in CF +1. CF off +2. Start recsync + - will poll in clean +3. Enable CF + - polling completes +4. CF will clean +5. Check result + - all channels inactive + +Clean and poll on recsync close: +1. CF on +2. Start recsync +3. add_extra_properties + - should see new props in CF +4. CF off +5. Recsync off + - will poll in clean + - (NYI) will give up if waiting too long +6. CF on +7. Check result + - all channels inactive + +Commit poll on IOC start: +1. Enable CF +2. Start recsync + - will perform clean +3. Disable CF +4. Start IOC + - waits for recsync announcement + - connects and polls commit +5. Enable CF +6. Commit poll fails once, reconnects, and commits +7. Check result + - all channels active + +Commit poll on IOC close: +1. Enable CF +2. Start Recsync +3. Start IOC +4. Wait for IOC to connect and commit + - all IOC channels now active +5. Disable CF +6. Close IOC + - commits [] and polls +7. Enable CF + - reconnects after polling and completes commit +8. Check result + - all channels inactive + +100 IOCs overlapping 1 IOC: +1. Enable CF +2. Start recsync +3. Start st.cmd in a command prompt + - IOC will connect and commit channels + - 4 channels will be active +4. run test_mock_iocs.py + - wait forever + - all channels will be active + - 2 channels will still belong to st.cmd +5. kill test_mock_iocs.py or killall st_test.cmd + - wait forever again + - 4 channels will be active, all belonging to st.cmd \ No newline at end of file diff --git a/server/recceiver/scripts/test_mock_iocs.py b/server/recceiver/scripts/test_mock_iocs.py index e33bf08c..e389fbc7 100644 --- a/server/recceiver/scripts/test_mock_iocs.py +++ b/server/recceiver/scripts/test_mock_iocs.py @@ -11,8 +11,9 @@ def startIOC(): # conf needs to be set pid, fd = os.forkpty() if pid == 0: - os.chdir("/home/devuser/git/skinner/recsync/client/iocBoot/iocdemo") - os.execv("st.cmd", ['']) + os.chdir("../../../client/iocBoot/iocdemo") + print os.curdir + os.execv("st_test.cmd", ['']) return pid, fd @@ -34,7 +35,7 @@ def main(): signal.signal(signal.SIGTERM, handler) os.chdir(os.path.dirname(os.path.abspath(sys.argv[0]))) # Uses a filename, not good, also only works on linux? threads = [] - for i in range(0, 899): + for i in range(1, 100): iocpid, iocfd = startIOC() pids.append(iocpid) print "len pids: ", len(pids) From f60784229db3169d97fab6137d5b82493cf74cc5 Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Mon, 12 Sep 2016 12:53:20 -0700 Subject: [PATCH 10/13] Added new test IOC "st_test.cmd" with 1000 channels. Added script for adding active channels for manual tests. Added readme to describe manual test process. Added debug logging for cleaning stages. --- client/iocBoot/iocdemo/st_test.cmd | 23 +++++++++++++++++++ server/recceiver/cfstore.py | 18 ++++++++++----- server/recceiver/mock_client.py | 11 ++------- .../recceiver/scripts/add_extra_properties.py | 23 +++++++++++++++++++ server/recceiver/scripts/readme | 1 + 5 files changed, 61 insertions(+), 15 deletions(-) create mode 100755 client/iocBoot/iocdemo/st_test.cmd create mode 100644 server/recceiver/scripts/add_extra_properties.py diff --git a/client/iocBoot/iocdemo/st_test.cmd b/client/iocBoot/iocdemo/st_test.cmd new file mode 100755 index 00000000..4767834c --- /dev/null +++ b/client/iocBoot/iocdemo/st_test.cmd @@ -0,0 +1,23 @@ +#!../../bin/linux-x86_64-debug/demo + +## You may have to change demo to something else +## everywhere it appears in this file + +< envPaths + +## Register all support components +dbLoadDatabase("../../dbd/demo.dbd",0,0) +demo_registerRecordDeviceDriver(pdbbase) + +var(reccastTimeout, 5.0) +var(reccastMaxHoldoff, 5.0) + +epicsEnvSet("IOCNAME", "myioc") +epicsEnvSet("ENGINEER", "myself") +epicsEnvSet("LOCATION", "myplace") + + +## Load record instances +dbLoadRecords("../../db/cfstore_test_records.db", "P=test:") + +iocInit() diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index f04e3894..bde77f84 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -118,23 +118,26 @@ def clean_service(self): while 1: try: if _log.isEnabledFor(logging.DEBUG): - _log.debug("cleaning...") + _log.debug("Cleaning service...") channels = self.client.findByArgs([('pvStatus', 'Active')]) if channels is not None: - #_log.debug("chs: " + str(channels)) new_channels = [] for ch in channels or []: new_channels.append(ch[u'name']) if len(new_channels) > 0: self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, channelNames=new_channels) + if _log.isEnabledFor(logging.DEBUG): + _log.debug("Service clean.") return - except HTTPError: + except: # needs to catch non HTTPError for when glassfish is down _log.exception("cleaning failed, retrying: ") time.sleep(min(60, sleep)) sleep *= 1.5 if self.running == 0 and sleep >= retry_limit: + if _log.isEnabledFor(logging.DEBUG): + _log.debug("Abandoning clean.") return @@ -280,8 +283,9 @@ def checkPropertiesExist(client, propOwner): if client.findProperty(propName) is None: try: client.set(property={u'name': propName, u'owner': propOwner}) - except Exception as e: - _log.error('Failed to create the property %s: %s', propName, e) + except Exception: + _log.exception('Failed to create the property %s', propName) + raise def getCurrentTime(): @@ -289,6 +293,8 @@ def getCurrentTime(): def poll(update, client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner): + if _log.isEnabledFor(logging.DEBUG): + _log.debug("Polling begin: ") sleep = 1 success = False while not success: @@ -296,7 +302,7 @@ def poll(update, client, new, delrec, channels_dict, iocs, hostName, iocName, ti update(client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner) success = True return success - except HTTPError as e: # should catch only network errors + except StandardError as e: # needs to catch non HTTP errors when glassfish is not active if _log.isEnabledFor(logging.DEBUG): _log.debug("error: " + str(e.message)) _log.debug("SLEEP: " + str(min(60, sleep))) diff --git a/server/recceiver/mock_client.py b/server/recceiver/mock_client.py index 55d97584..c992c5d1 100644 --- a/server/recceiver/mock_client.py +++ b/server/recceiver/mock_client.py @@ -42,20 +42,17 @@ def findProperty(self, prop_name): if not self.connected: raise HTTPError("Mock Channelfinder Client HTTPError", response=self) else: - # print "findProperty: ", prop_name - pass + if prop_name in ['hostName', 'iocName', 'pvStatus', 'time']: + return prop_name def set(self, channels): if not self.connected or self.fail_set: raise HTTPError("Mock Channelfinder Client HTTPError", response=self) else: - #print "channels:\n", channels for channel in channels: self.addChannel(channel) - #print "CF:\n", self.cf def update(self, property, channelNames): - print "update" if not self.connected or self.fail_find: raise HTTPError("Mock Channelfinder Client HTTPError", response=self) else: @@ -66,8 +63,6 @@ def addChannel(self, channel): self.cf[channel[u'name']] = channel def __updateChannelWithProp(self, property, channel): - print "p: ", property - print "c: ", channel if channel in self.cf: for prop in self.cf[channel]['properties']: if prop['name'] == property['name']: @@ -84,8 +79,6 @@ def get(self, name, target): class mock_TR(): def __init__(self): - #self.addrec = {5570560: ('test:lo', 'longout'), 5636096: ('test:Msg-I', 'stringin'), 5701632: ('test:li', 'longin'), 5767168: ('test:State-Sts', 'mbbi')} - #self.addrec = {1: ('name', 'longout')} self.addrec = {} self.src = IPv4Address('TCP', 'testhosta', 1111) self.delrec = () diff --git a/server/recceiver/scripts/add_extra_properties.py b/server/recceiver/scripts/add_extra_properties.py new file mode 100644 index 00000000..1cb0e119 --- /dev/null +++ b/server/recceiver/scripts/add_extra_properties.py @@ -0,0 +1,23 @@ +from channelfinder import ChannelFinderClient + +''' +Simple script for adding active channels to Channel Finder Service for testing cf-store clean +If it gives a 500 error, run it again. Glassfish and CFS must be set up and running. +''' + + +def abbr(name, hostname, iocname, status): + return {u'owner': 'cf-update', + u'name': name, + u'properties': [ + {u'owner': 'cf-update', u'name': 'hostName', + u'value': hostname}, + {u'owner': 'cf-update', u'name': 'iocName', + u'value': iocname}, + {u'owner': 'cf-update', u'name': 'pvStatus', + u'value': status}, + {u'owner': 'cf-update', u'name': 'time', + u'value': '2016-08-18 12:33:09.953985'}]} + +client = ChannelFinderClient() +client.set(channels=[abbr(u'ch1', 'testhosta', 1111, 'Active'), abbr(u'test_channel', 'testhosta', 1111, 'Active')]) diff --git a/server/recceiver/scripts/readme b/server/recceiver/scripts/readme index a94a4b14..c04ee1e4 100644 --- a/server/recceiver/scripts/readme +++ b/server/recceiver/scripts/readme @@ -60,6 +60,7 @@ Commit poll on IOC close: - all channels inactive 100 IOCs overlapping 1 IOC: +0. * Disable logging in demo.conf * 1. Enable CF 2. Start recsync 3. Start st.cmd in a command prompt From 8109404a51d46c73832766af81ae49f8714e0b71 Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Mon, 12 Sep 2016 12:54:53 -0700 Subject: [PATCH 11/13] Added cf-store section to conf file, commented out debug file production --- server/demo.conf | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/demo.conf b/server/demo.conf index 05e133bd..1effde5f 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -56,3 +56,10 @@ dbname = test.db # Must be unique for each instance accessing # a common database. idkey = 42 + +[cf] +# cf-store application + +# Debug output file location. +# Produces no file when not defined. +# debug_file_loc = /home/devuser/recsyncdata.txt \ No newline at end of file From c3046a23f97e0810a704effe028d73dc7f649260 Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Mon, 12 Sep 2016 13:52:42 -0700 Subject: [PATCH 12/13] Now handling all instances of RequestException. Cleaned up dictionary logic with collections.defaultdict. --- server/recceiver/cfstore.py | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index bde77f84..4f9b7f32 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -2,13 +2,14 @@ import logging _log = logging.getLogger(__name__) -from requests import HTTPError +from requests import RequestException from zope.interface import implements from twisted.application import service from twisted.internet.threads import deferToThread from twisted.internet.defer import DeferredLock from twisted.internet import defer from operator import itemgetter +from collections import defaultdict import time import interfaces import datetime @@ -34,7 +35,7 @@ class CFProcessor(service.Service): def __init__(self, name, conf): _log.info("CF_INIT %s", name) self.name, self.conf = name, conf - self.channel_dict = dict() + self.channel_dict = defaultdict(list) self.iocs = dict() self.client = None self.currentTime = getCurrentTime @@ -70,7 +71,7 @@ def __commit__(self, TR): if _log.isEnabledFor(logging.DEBUG): _log.debug("CF_COMMIT %s", TR.infos.items()) pvNames = [unicode(rname, "utf-8") for rid, (rname, rtype) in TR.addrec.iteritems()] - delrec = list(TR.delrec) or [] + delrec = list(TR.delrec) iocName = TR.src.port hostName = TR.src.host iocid = hostName + ":" + str(iocName) @@ -79,19 +80,10 @@ def __commit__(self, TR): if TR.initial: self.iocs[iocid] = {"iocname": iocName, "hostname": hostName, "owner": owner, "channelcount": 0} # add IOC to source list if not TR.connected: - if delrec: - delrec = delrec.append(self.channel_dict.keys()) - else: - delrec = self.channel_dict.keys() + delrec.extend(self.channel_dict.keys()) for pv in pvNames: - if pv in self.channel_dict: - if iocid not in self.channel_dict[pv]: - self.channel_dict[pv].append(iocid) # add iocname to pvName in dict - self.iocs[iocid]["channelcount"] += 1 - # else: # info already in dictionary, possibly recovering from ioc disconnect - else: - self.channel_dict[pv] = [iocid] # add pvName with [iocname] in dict - self.iocs[iocid]["channelcount"] += 1 + self.channel_dict[pv].append(iocid) # add iocname to pvName in dict + self.iocs[iocid]["channelcount"] += 1 for pv in delrec: if iocid in self.channel_dict[pv]: self.channel_dict[pv].remove(iocid) @@ -130,7 +122,7 @@ def clean_service(self): if _log.isEnabledFor(logging.DEBUG): _log.debug("Service clean.") return - except: # needs to catch non HTTPError for when glassfish is down + except RequestException: _log.exception("cleaning failed, retrying: ") time.sleep(min(60, sleep)) @@ -302,7 +294,7 @@ def poll(update, client, new, delrec, channels_dict, iocs, hostName, iocName, ti update(client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner) success = True return success - except StandardError as e: # needs to catch non HTTP errors when glassfish is not active + except RequestException as e: if _log.isEnabledFor(logging.DEBUG): _log.debug("error: " + str(e.message)) _log.debug("SLEEP: " + str(min(60, sleep))) From d6cb172110c50247ff44f75cefa7d95aa18a1401 Mon Sep 17 00:00:00 2001 From: mskinner5278 Date: Mon, 12 Sep 2016 14:01:41 -0700 Subject: [PATCH 13/13] Removed useless code and explicit log level checking. --- server/recceiver/cfstore.py | 30 +++++++++--------------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 4f9b7f32..ed73f489 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -68,8 +68,7 @@ def commit(self, transaction_record): self.lock.release() def __commit__(self, TR): - if _log.isEnabledFor(logging.DEBUG): - _log.debug("CF_COMMIT %s", TR.infos.items()) + _log.debug("CF_COMMIT %s", TR.infos.items()) pvNames = [unicode(rname, "utf-8") for rid, (rname, rtype) in TR.addrec.iteritems()] delrec = list(TR.delrec) iocName = TR.src.port @@ -94,13 +93,7 @@ def __commit__(self, TR): _log.error("channel count negative!") if len(self.channel_dict[pv]) <= 0: # case: channel has no more iocs del self.channel_dict[pv] - - if iocName and hostName and owner: - poll(__updateCF__, self.client, pvNames, delrec, self.channel_dict, self.iocs, hostName, iocName, time, owner) - else: - _log.error('failed to initialize one or more of the following properties' + - 'hostname: %s iocname: %s owner: %s', hostName, iocName, owner) - #unlock in wrapper function + poll(__updateCF__, self.client, pvNames, delrec, self.channel_dict, self.iocs, hostName, iocName, time, owner) dict_to_file(self.channel_dict, self.iocs, self.conf) def clean_service(self): @@ -109,8 +102,7 @@ def clean_service(self): owner = self.conf.get('username', 'cfstore') while 1: try: - if _log.isEnabledFor(logging.DEBUG): - _log.debug("Cleaning service...") + _log.debug("Cleaning service...") channels = self.client.findByArgs([('pvStatus', 'Active')]) if channels is not None: new_channels = [] @@ -119,8 +111,7 @@ def clean_service(self): if len(new_channels) > 0: self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, channelNames=new_channels) - if _log.isEnabledFor(logging.DEBUG): - _log.debug("Service clean.") + _log.debug("Service clean.") return except RequestException: _log.exception("cleaning failed, retrying: ") @@ -128,8 +119,7 @@ def clean_service(self): time.sleep(min(60, sleep)) sleep *= 1.5 if self.running == 0 and sleep >= retry_limit: - if _log.isEnabledFor(logging.DEBUG): - _log.debug("Abandoning clean.") + _log.debug("Abandoning clean.") return @@ -285,8 +275,7 @@ def getCurrentTime(): def poll(update, client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner): - if _log.isEnabledFor(logging.DEBUG): - _log.debug("Polling begin: ") + _log.debug("Polling begin: ") sleep = 1 success = False while not success: @@ -295,10 +284,9 @@ def poll(update, client, new, delrec, channels_dict, iocs, hostName, iocName, ti success = True return success except RequestException as e: - if _log.isEnabledFor(logging.DEBUG): - _log.debug("error: " + str(e.message)) - _log.debug("SLEEP: " + str(min(60, sleep))) - _log.debug(str(channels_dict)) + _log.debug("error: " + str(e.message)) + _log.debug("SLEEP: " + str(min(60, sleep))) + _log.debug(str(channels_dict)) time.sleep(min(60, sleep)) sleep *= 1.5