diff --git a/client/iocBoot/iocdemo/st_test.cmd b/client/iocBoot/iocdemo/st_test.cmd new file mode 100755 index 00000000..4767834c --- /dev/null +++ b/client/iocBoot/iocdemo/st_test.cmd @@ -0,0 +1,23 @@ +#!../../bin/linux-x86_64-debug/demo + +## You may have to change demo to something else +## everywhere it appears in this file + +< envPaths + +## Register all support components +dbLoadDatabase("../../dbd/demo.dbd",0,0) +demo_registerRecordDeviceDriver(pdbbase) + +var(reccastTimeout, 5.0) +var(reccastMaxHoldoff, 5.0) + +epicsEnvSet("IOCNAME", "myioc") +epicsEnvSet("ENGINEER", "myself") +epicsEnvSet("LOCATION", "myplace") + + +## Load record instances +dbLoadRecords("../../db/cfstore_test_records.db", "P=test:") + +iocInit() diff --git a/server/demo.conf b/server/demo.conf index 05e133bd..1effde5f 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -56,3 +56,10 @@ dbname = test.db # Must be unique for each instance accessing # a common database. idkey = 42 + +[cf] +# cf-store application + +# Debug output file location. +# Produces no file when not defined. +# debug_file_loc = /home/devuser/recsyncdata.txt \ No newline at end of file diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 9f5909fd..ed73f489 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -2,11 +2,19 @@ import logging _log = logging.getLogger(__name__) - +from requests import RequestException from zope.interface import implements from twisted.application import service +from twisted.internet.threads import deferToThread +from twisted.internet.defer import DeferredLock +from twisted.internet import defer +from operator import itemgetter +from collections import defaultdict +import time import interfaces import datetime +import os +import json # ITRANSACTION FORMAT: # @@ -15,20 +23,24 @@ # delrec = a set() of records which are being removed # infos = dictionary of client infos # recinfos = additional infos being added to existing records -# "recid: {key:vale}" +# "recid: {key:value}" # -__all__ = ['CFProcessor'] +__all__ = ['CFProcessor'] class CFProcessor(service.Service): implements(interfaces.IProcessor) def __init__(self, name, conf): _log.info("CF_INIT %s", name) - self.name,self.conf = name,conf - - + self.name, self.conf = name, conf + self.channel_dict = defaultdict(list) + self.iocs = dict() + self.client = None + self.currentTime = getCurrentTime + self.lock = DeferredLock() + def startService(self): service.Service.startService(self) self.running = 1 @@ -36,137 +48,212 @@ def startService(self): from channelfinder import ChannelFinderClient # Using the default python cf-client. # The usr, username, and password are provided by the channelfinder._conf module. - self.client = ChannelFinderClient() - + if self.client is None: # For setting up mock test client + self.client = ChannelFinderClient() + self.clean_service() + def stopService(self): service.Service.stopService(self) #Set channels to inactive and close connection to client self.running = 0 + self.clean_service() _log.info("CF_STOP") - def commit(self, TR): - if _log.isEnabledFor(logging.DEBUG): - _log.debug("CF_COMMIT %s", TR.infos.items()) + @defer.inlineCallbacks + def commit(self, transaction_record): + yield self.lock.acquire() + try: + yield deferToThread(self.__commit__, transaction_record) + finally: + self.lock.release() + + def __commit__(self, TR): + _log.debug("CF_COMMIT %s", TR.infos.items()) pvNames = [unicode(rname, "utf-8") for rid, (rname, rtype) in TR.addrec.iteritems()] - iocName=TR.src.port - hostName=TR.src.host - - ''' - Currently using the hostIP and the iocPort - if 'IOCNAME' in TR.infos: - iocName = TR.infos['IOCNAME'] - if 'HOSTNAME' in TR.infos: - hostName = TR.infos['HOSTNAME'] - ''' + delrec = list(TR.delrec) + iocName = TR.src.port + hostName = TR.src.host + iocid = hostName + ":" + str(iocName) owner = TR.infos.get('CF_USERNAME') or TR.infos.get('ENGINEER') or self.conf.get('username', 'cfstore') - time = str(datetime.datetime.now()) - - if iocName and hostName and owner: - updateChannelFinder(self.client, pvNames, hostName, iocName, time, owner) - else: - _log.error('failed to initialize one or more of the following properties'+ - 'hostname: %s iocname: %s owner: %s',hostName,iocName,owner) - -def updateChannelFinder(client, pvNames, hostName, iocName, time, owner): - ''' - pvNames = list of pvNames - ([] permitted will effectively remove the hostname, iocname from all channels) - hostName = pv hostName (None not permitted) - iocName = pv iocName (None not permitted) - owner = the owner of the channels and properties being added, this can be different from the user - e.g. user = abc might create a channel with owner = group-abc - time = the time at which these channels are being created/modified - ''' - if hostName == None or iocName == None: + time = self.currentTime() + if TR.initial: + self.iocs[iocid] = {"iocname": iocName, "hostname": hostName, "owner": owner, "channelcount": 0} # add IOC to source list + if not TR.connected: + delrec.extend(self.channel_dict.keys()) + for pv in pvNames: + self.channel_dict[pv].append(iocid) # add iocname to pvName in dict + self.iocs[iocid]["channelcount"] += 1 + for pv in delrec: + if iocid in self.channel_dict[pv]: + self.channel_dict[pv].remove(iocid) + self.iocs[iocid]["channelcount"] -= 1 + if self.iocs[iocid]['channelcount'] == 0: + self.iocs.pop(iocid, None) + elif self.iocs[iocid]['channelcount'] < 0: + _log.error("channel count negative!") + if len(self.channel_dict[pv]) <= 0: # case: channel has no more iocs + del self.channel_dict[pv] + poll(__updateCF__, self.client, pvNames, delrec, self.channel_dict, self.iocs, hostName, iocName, time, owner) + dict_to_file(self.channel_dict, self.iocs, self.conf) + + def clean_service(self): + sleep = 1 + retry_limit = 5 + owner = self.conf.get('username', 'cfstore') + while 1: + try: + _log.debug("Cleaning service...") + channels = self.client.findByArgs([('pvStatus', 'Active')]) + if channels is not None: + new_channels = [] + for ch in channels or []: + new_channels.append(ch[u'name']) + if len(new_channels) > 0: + self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, + channelNames=new_channels) + _log.debug("Service clean.") + return + except RequestException: + _log.exception("cleaning failed, retrying: ") + + time.sleep(min(60, sleep)) + sleep *= 1.5 + if self.running == 0 and sleep >= retry_limit: + _log.debug("Abandoning clean.") + return + + +def dict_to_file(dict, iocs, conf): + filename = conf.get('debug_file_loc', None) + if filename: + if os.path.isfile(filename): + os.remove(filename) + list = [] + for key in dict: + list.append([key, iocs[dict[key][-1]]['hostname'], iocs[dict[key][-1]]['iocname']]) + + list.sort(key=itemgetter(0)) + + with open(filename, 'wrx') as f: + json.dump(list, f) + + +def __updateCF__(client, new, delrec, channels_dict, iocs, hostName, iocName, time, owner): + if hostName is None or iocName is None: raise Exception('missing hostName or iocName') channels = [] checkPropertiesExist(client, owner) - previousChannelsList = client.findByArgs([('hostName', hostName), ('iocName', iocName)]) - if previousChannelsList != None: - for ch in previousChannelsList: - if pvNames != None and ch[u'name'] in pvNames: - channels.append(updateChannel(ch,\ - owner=owner, \ - hostName=hostName, \ - iocName=iocName, \ - pvStatus='Active', \ - time=time)) - pvNames.remove(ch[u'name']) - elif pvNames == None or ch[u'name'] not in pvNames: - '''Orphan the channel : mark as inactive, keep the old hostName and iocName''' - oldHostName = hostName - oldIocName = iocName - oldTime = time - for prop in ch[u'properties']: - if prop[u'name'] == u'hostName': - oldHostName = prop[u'value'] - if prop[u'name'] == u'iocName': - oldIocName = prop[u'value'] - if prop[u'name'] == u'time': - oldTime = prop[u'value'] - channels.append(updateChannel(ch, \ - owner=owner, \ - hostName=oldHostName, \ - iocName=oldIocName, \ - pvStatus='InActive', \ - time=oldTime)) + old = client.findByArgs([('hostName', hostName), ('iocName', iocName)]) + if old is not None: + for ch in old: + if new == [] or ch[u'name'] in delrec: # case: empty commit/del, remove all reference to ioc + if ch[u'name'] in channels_dict: + channels.append(updateChannel(ch, + owner=iocs[channels_dict[ch[u'name']][-1]]["owner"], + hostName=iocs[channels_dict[ch[u'name']][-1]]["hostname"], + iocName=iocs[channels_dict[ch[u'name']][-1]]["iocname"], + pvStatus='Active', + time=time)) + else: + '''Orphan the channel : mark as inactive, keep the old hostName and iocName''' + oldHostName = hostName + oldIocName = iocName + oldTime = time + for prop in ch[u'properties']: + if prop[u'name'] == u'hostName': + oldHostName = prop[u'value'] + if prop[u'name'] == u'iocName': + oldIocName = prop[u'value'] + if prop[u'name'] == u'time': + oldTime = prop[u'value'] + channels.append(updateChannel(ch, + owner=owner, + hostName=oldHostName, + iocName=oldIocName, + pvStatus='Inactive', + time=oldTime)) + else: + if ch in new: # case: channel in old and new + channels.append(updateChannel(ch, + owner=iocs[channels_dict[ch[u'name']][-1]]["owner"], + hostName=iocs[channels_dict[ch[u'name']][-1]]["hostname"], + iocName=iocs[channels_dict[ch[u'name']][-1]]["iocname"], + pvStatus='Active', + time=time)) + new.remove(ch[u'name']) # now pvNames contains a list of pv's new on this host/ioc - for pv in pvNames: - ch = client.findByArgs([('~name',pv)]) + for pv in new: + ch = client.findByArgs([('~name', pv)]) if not ch: '''New channel''' - channels.append(createChannel(pv, \ - chOwner=owner, \ - hostName=hostName, \ - iocName=iocName, \ - pvStatus='Active', \ + channels.append(createChannel(pv, + chOwner=owner, + hostName=hostName, + iocName=iocName, + pvStatus='Active', time=time)) - elif len(ch) == 1: + else: '''update existing channel: exists but with a different hostName and/or iocName''' - channels.append(updateChannel(ch[0], \ - owner=owner, \ - hostName=hostName, \ - iocName=iocName, \ - pvStatus='Active', \ + channels.append(updateChannel(ch[0], + owner=owner, + hostName=hostName, + iocName=iocName, + pvStatus='Active', time=time)) - client.set(channels=channels) + if len(channels) != 0: # Fixes a potential server error which occurs when a client.set results in no changes + client.set(channels=channels) + else: + if old and len(old) != 0: + client.set(channels=channels) -def updateChannel(channel, owner, hostName=None, iocName=None, pvStatus='InActive', time=None): +def updateChannel(channel, owner, hostName=None, iocName=None, pvStatus='Inactive', time=None): ''' Helper to update a channel object so as to not affect the existing properties ''' # properties list devoid of hostName and iocName properties if channel[u'properties']: - channel[u'properties'] = [property for property in channel[u'properties'] \ - if property[u'name'] != 'hostName' \ - and property[u'name'] != 'iocName'\ - and property[u'name'] != 'pvStatus'] + channel[u'properties'] = [property for property in channel[u'properties'] + if property[u'name'] != 'hostName' + and property[u'name'] != 'iocName' + and property[u'name'] != 'pvStatus' + and property[u'name'] != 'time'] else: channel[u'properties'] = [] - if hostName != None: - channel[u'properties'].append({u'name' : 'hostName', u'owner' : owner, u'value': hostName}) - if iocName != None: - channel[u'properties'].append({u'name' : 'iocName', u'owner' : owner, u'value': iocName}) + if hostName is not None: + channel[u'properties'].append({u'name': 'hostName', u'owner': owner, u'value': hostName}) + if iocName is not None: + channel[u'properties'].append({u'name': 'iocName', u'owner': owner, u'value': iocName}) if pvStatus: - channel[u'properties'].append({u'name' : 'pvStatus', u'owner' : owner, u'value': pvStatus}) + channel[u'properties'].append({u'name': 'pvStatus', u'owner': owner, u'value': pvStatus}) if time: - channel[u'properties'].append({u'name' : 'time', u'owner' : owner, u'value': time}) + channel[u'properties'].append({u'name': 'time', u'owner': owner, u'value': time}) + return channel + +def clean_channel(channel): + # properties list devoid of hostName and iocName properties + if channel[u'properties']: + channel[u'properties'] = [property for property in channel[u'properties'] + if property[u'name'] != 'pvStatus'] + else: + channel[u'properties'] = [] + + channel[u'properties'].append({u'name': 'pvStatus', u'owner': channel['owner'], u'value': 'Inactive'}) return channel -def createChannel(chName, chOwner, hostName=None, iocName=None, pvStatus='InActive', time=None): +def createChannel(chName, chOwner, hostName=None, iocName=None, pvStatus='Inactive', time=None): ''' Helper to create a channel object with the required properties ''' - ch = {u'name':chName,u'owner':chOwner,u'properties':[]} - if hostName != None: - ch[u'properties'].append({u'name' : 'hostName', u'owner' : chOwner, u'value': hostName}) - if iocName != None: - ch[u'properties'].append({u'name' : 'iocName', u'owner' : chOwner, u'value': iocName}) + ch = {u'name': chName, u'owner': chOwner, u'properties': []} + if hostName is not None: + ch[u'properties'].append({u'name': 'hostName', u'owner': chOwner, u'value': hostName}) + if iocName is not None: + ch[u'properties'].append({u'name': 'iocName', u'owner': chOwner, u'value': iocName}) if pvStatus: - ch[u'properties'].append({u'name' : 'pvStatus', u'owner' : chOwner, u'value': pvStatus}) + ch[u'properties'].append({u'name': 'pvStatus', u'owner': chOwner, u'value': pvStatus}) if time: - ch[u'properties'].append({u'name' : 'time', u'owner' : chOwner, u'value': time}) + ch[u'properties'].append({u'name': 'time', u'owner': chOwner, u'value': time}) return ch def checkPropertiesExist(client, propOwner): @@ -175,9 +262,31 @@ def checkPropertiesExist(client, propOwner): ''' requiredProperties = ['hostName', 'iocName', 'pvStatus', 'time'] for propName in requiredProperties: - if client.findProperty(propName) == None: + if client.findProperty(propName) is None: try: - client.set(property={u'name': propName, u'owner':propOwner}) - except Exception as e: - _log.error('Failed to create the property %s: %s',propName, e) + client.set(property={u'name': propName, u'owner': propOwner}) + except Exception: + _log.exception('Failed to create the property %s', propName) + raise + + +def getCurrentTime(): + return str(datetime.datetime.now()) + + +def poll(update, client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner): + _log.debug("Polling begin: ") + sleep = 1 + success = False + while not success: + try: + update(client, new, delrec, channels_dict, iocs, hostName, iocName, times, owner) + success = True + return success + except RequestException as e: + _log.debug("error: " + str(e.message)) + _log.debug("SLEEP: " + str(min(60, sleep))) + _log.debug(str(channels_dict)) + time.sleep(min(60, sleep)) + sleep *= 1.5 diff --git a/server/recceiver/interfaces.py b/server/recceiver/interfaces.py index 7b280e32..81aeff51 100644 --- a/server/recceiver/interfaces.py +++ b/server/recceiver/interfaces.py @@ -9,7 +9,7 @@ class ITransaction(Interface): src = Attribute('Source Address.') addrec = Attribute("""Records being added - recid: ('recname', 'rectype', {'key':'val'}) + {recid: ('recname', 'rectype', {'key':'val'})} """) delrec = Attribute('A set() of recids which are being removed') diff --git a/server/recceiver/mock_client.py b/server/recceiver/mock_client.py new file mode 100644 index 00000000..c992c5d1 --- /dev/null +++ b/server/recceiver/mock_client.py @@ -0,0 +1,89 @@ +from twisted.internet.address import IPv4Address +from requests import HTTPError +class mock_client(): + def __init__(self): + self.cf = {} + self.connected = True + self.fail_find = False + self.fail_set = False + + def findByArgs(self, args): + if not self.connected: + raise HTTPError("Mock Channelfinder Client HTTPError", response=self) + else: + result = [] + + if len(args) > 1: # returning old + for ch in self.cf: + name_flag = False + prop_flag = False + for props in self.cf[ch][u'properties']: + if props[u'name'] == args[0][0]: + if props[u'value'] == args[0][1]: + name_flag = True + if props[u'name'] == args[1][0]: + if props[u'value'] == args[1][1]: + prop_flag = True + if name_flag and prop_flag: + result.append(self.cf[ch]) + return result + else: + if args[0][0] == '~name' and args[0][1] in self.cf: + return [self.cf[args[0][1]]] + if args[0][0] == 'pvStatus' and args[0][1] == 'Active': + for ch in self.cf: + for prop in self.cf[ch]['properties']: + if prop['name'] == 'pvStatus': + if prop['value'] == 'Active': + result.append(self.cf[ch]) + return result + + def findProperty(self, prop_name): + if not self.connected: + raise HTTPError("Mock Channelfinder Client HTTPError", response=self) + else: + if prop_name in ['hostName', 'iocName', 'pvStatus', 'time']: + return prop_name + + def set(self, channels): + if not self.connected or self.fail_set: + raise HTTPError("Mock Channelfinder Client HTTPError", response=self) + else: + for channel in channels: + self.addChannel(channel) + + def update(self, property, channelNames): + if not self.connected or self.fail_find: + raise HTTPError("Mock Channelfinder Client HTTPError", response=self) + else: + for channel in channelNames: + self.__updateChannelWithProp(property, channel) + + def addChannel(self, channel): + self.cf[channel[u'name']] = channel + + def __updateChannelWithProp(self, property, channel): + if channel in self.cf: + for prop in self.cf[channel]['properties']: + if prop['name'] == property['name']: + prop['value'] = property['value'] + prop['owner'] = property['owner'] # also overwriting owner because that's what CF does + return + +class mock_conf(): + def __init__(self): + pass + + def get(self, name, target): + return "cf-update" + +class mock_TR(): + def __init__(self): + self.addrec = {} + self.src = IPv4Address('TCP', 'testhosta', 1111) + self.delrec = () + self.infos = {'CF_USERNAME': 'cf-update', 'ENGINEER': 'cf-engi'} + self.initial = True + self.connected = True + self.fail_set = False + self.fail_find = False \ No newline at end of file diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index d662fe74..1a9dd01a 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -87,6 +87,7 @@ def timed(self): self.phase = 2 self.nonce = random.randint(0,0xffffffff) self.writeMsg(0x8002, _ping.pack(self.nonce)) + _log.debug("ping nonce: " + str(self.nonce)) def getInitialState(self): return (self.recvHeader, 8) @@ -124,6 +125,7 @@ def recvPong(self, body): _log.error('pong nonce does not match! %s!=%s',nonce,self.nonce) self.transport.loseConnection() else: + _log.debug('pong nonce match') self.phase = 1 return self.getInitialState() @@ -241,7 +243,7 @@ def flush(self, connected=True): op = self.factory.commit(TR) if op: op.addCallbacks(self.resume, self.abort) - self.proto.transport.pauseProducing() + #self.proto.transport.pauseProducing() self.op = op def resume(self, arg): diff --git a/server/recceiver/scripts/add_extra_properties.py b/server/recceiver/scripts/add_extra_properties.py new file mode 100644 index 00000000..1cb0e119 --- /dev/null +++ b/server/recceiver/scripts/add_extra_properties.py @@ -0,0 +1,23 @@ +from channelfinder import ChannelFinderClient + +''' +Simple script for adding active channels to Channel Finder Service for testing cf-store clean +If it gives a 500 error, run it again. Glassfish and CFS must be set up and running. +''' + + +def abbr(name, hostname, iocname, status): + return {u'owner': 'cf-update', + u'name': name, + u'properties': [ + {u'owner': 'cf-update', u'name': 'hostName', + u'value': hostname}, + {u'owner': 'cf-update', u'name': 'iocName', + u'value': iocname}, + {u'owner': 'cf-update', u'name': 'pvStatus', + u'value': status}, + {u'owner': 'cf-update', u'name': 'time', + u'value': '2016-08-18 12:33:09.953985'}]} + +client = ChannelFinderClient() +client.set(channels=[abbr(u'ch1', 'testhosta', 1111, 'Active'), abbr(u'test_channel', 'testhosta', 1111, 'Active')]) diff --git a/server/recceiver/scripts/print_cf_data.py b/server/recceiver/scripts/print_cf_data.py new file mode 100644 index 00000000..686a6202 --- /dev/null +++ b/server/recceiver/scripts/print_cf_data.py @@ -0,0 +1,36 @@ +from channelfinder import ChannelFinderClient +import json +import os +from operator import itemgetter +filename = "/home/devuser/cfdata" # change this to output file name +client = ChannelFinderClient() + + +def get_cf_data(client): + channels = client.findByArgs([('pvStatus', 'Active')]) + + for ch in channels: + ch.pop('owner', None) + ch.pop('tags', None) + for prop in ch['properties']: + if prop['name'] == 'hostName': + ch['hostName'] = prop['value'] + if prop['name'] == 'iocName': + ch['iocName'] = prop['value'] + ch.pop('properties', None) + return channels + +channels = get_cf_data(client) + +if os.path.isfile(filename): + os.remove(filename) + +new_list = [] + +for channel in channels: + new_list.append([channel['name'], channel['hostName'], int(channel['iocName'])]) + +new_list.sort(key=itemgetter(0)) + +with open(filename, 'wrx') as f: + json.dump(new_list, f) diff --git a/server/recceiver/scripts/readme b/server/recceiver/scripts/readme new file mode 100644 index 00000000..c04ee1e4 --- /dev/null +++ b/server/recceiver/scripts/readme @@ -0,0 +1,75 @@ +MANUAL TEST LOG: + +PASSED - Clean and poll on recsync start +PASSED - Clean and poll on recsync close +PASSED - Clean and poll on IOC start +PASSED - Clean and poll on IOC close +PASSED - 100 IOCs overlapping 1 IOC + +Clean and poll on recsync start: +0.a CF on +0.b add_extra_properties + - should see the new props in CF +1. CF off +2. Start recsync + - will poll in clean +3. Enable CF + - polling completes +4. CF will clean +5. Check result + - all channels inactive + +Clean and poll on recsync close: +1. CF on +2. Start recsync +3. add_extra_properties + - should see new props in CF +4. CF off +5. Recsync off + - will poll in clean + - (NYI) will give up if waiting too long +6. CF on +7. Check result + - all channels inactive + +Commit poll on IOC start: +1. Enable CF +2. Start recsync + - will perform clean +3. Disable CF +4. Start IOC + - waits for recsync announcement + - connects and polls commit +5. Enable CF +6. Commit poll fails once, reconnects, and commits +7. Check result + - all channels active + +Commit poll on IOC close: +1. Enable CF +2. Start Recsync +3. Start IOC +4. Wait for IOC to connect and commit + - all IOC channels now active +5. Disable CF +6. Close IOC + - commits [] and polls +7. Enable CF + - reconnects after polling and completes commit +8. Check result + - all channels inactive + +100 IOCs overlapping 1 IOC: +0. * Disable logging in demo.conf * +1. Enable CF +2. Start recsync +3. Start st.cmd in a command prompt + - IOC will connect and commit channels + - 4 channels will be active +4. run test_mock_iocs.py + - wait forever + - all channels will be active + - 2 channels will still belong to st.cmd +5. kill test_mock_iocs.py or killall st_test.cmd + - wait forever again + - 4 channels will be active, all belonging to st.cmd \ No newline at end of file diff --git a/server/recceiver/scripts/test_mock_iocs.py b/server/recceiver/scripts/test_mock_iocs.py new file mode 100644 index 00000000..e389fbc7 --- /dev/null +++ b/server/recceiver/scripts/test_mock_iocs.py @@ -0,0 +1,52 @@ +import os +import threading +import sys +import time +import signal + +iocexecutable = "st.cmd" + + +def startIOC(): + # conf needs to be set + pid, fd = os.forkpty() + if pid == 0: + os.chdir("../../../client/iocBoot/iocdemo") + print os.curdir + os.execv("st_test.cmd", ['']) + return pid, fd + + +def readfd(fd): + while 1: + empt = str(os.read(fd, 16384).strip("\n")) + + +def handler(signum, frame): + global pids + for pid in pids: + os.kill(pid, signal.SIGKILL) + sys.exit() + + +def main(): + global pids + pids = [] + signal.signal(signal.SIGTERM, handler) + os.chdir(os.path.dirname(os.path.abspath(sys.argv[0]))) # Uses a filename, not good, also only works on linux? + threads = [] + for i in range(1, 100): + iocpid, iocfd = startIOC() + pids.append(iocpid) + print "len pids: ", len(pids) + iocthread = threading.Thread(group=None, target=readfd, args=(iocfd,), name="iocthread", kwargs={}) + threads.append(iocthread) + iocthread.start() + try: + while 1: + time.sleep(1) + except KeyboardInterrupt: + sys.exit() + +if __name__ == '__main__': + main() diff --git a/server/recceiver/test_cfstore.py b/server/recceiver/test_cfstore.py new file mode 100644 index 00000000..2479cc8b --- /dev/null +++ b/server/recceiver/test_cfstore.py @@ -0,0 +1,261 @@ +from twisted.trial.unittest import TestCase +from twisted.internet import defer +from mock_client import mock_conf +from mock_client import mock_TR +from mock_client import mock_client +from cfstore import CFProcessor + +import threading +import time + + +class MyTestCase(TestCase): + def setUp(self): + conf = mock_conf() + self.cfstore = CFProcessor("cf", conf) + self.cfstore.currentTime = getTime + self.maxDiff = None + + @defer.inlineCallbacks + def test_3_iocs(self): + self.cfclient = mock_client() + self.cfstore.client = self.cfclient + self.cfstore.startService() + TR1 = mock_TR() + TR1.src.host = 'testhosta' + TR1.src.port = 1111 + TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR1) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + + TR2 = mock_TR() + TR2.src.host = 'testhostb' + TR2.src.port = 2222 + TR2.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR2) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + + TR3 = mock_TR() + TR3.src.host = 'testhostc' + TR3.src.port = 3333 + TR3.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 4: ('ch4', 'stringin'), 6: ('ch6', 'stringin')} + deferred = yield self.cfstore.commit(TR3) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + + TR4 = mock_TR() + TR4.initial = False + TR4.addrec = {} + TR4.connected = False # simulated IOC Disconnect + TR4.src.host = 'testhostc' + TR4.src.port = 3333 + deferred = yield self.cfstore.commit(TR4) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + + TR5 = mock_TR() + TR5.src.host = 'testhostc' + TR5.src.port = 3333 + TR5.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 4: ('ch4', 'stringin'), 6: ('ch6', 'stringin')} + deferred = yield self.cfstore.commit(TR5) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + + TR6 = mock_TR() + TR6.initial = False + TR6.addrec = {} + TR6.connected = False # simulated IOC Disconnect + TR6.src.host = 'testhostb' + TR6.src.port = 2222 + deferred = yield self.cfstore.commit(TR6) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + + TR7 = mock_TR() + TR7.initial = False + TR7.addrec = {} + TR7.connected = False # simulated IOC Disconnect + TR7.src.host = 'testhosta' + TR7.src.port = 1111 + deferred = yield self.cfstore.commit(TR7) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Active'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Inactive')}) + + TR8 = mock_TR() + TR8.initial = False + TR8.addrec = {} + TR8.connected = False # simulated IOC Disconnect + TR8.src.host = 'testhostc' + TR8.src.port = 3333 + deferred = yield self.cfstore.commit(TR8) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostc', 3333, 'Inactive'), + u'ch2': abbr(u'ch2', 'testhostc', 3333, 'Inactive'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Inactive'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Inactive'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Inactive')}) + + TR9 = mock_TR() + TR9.src.host = 'testhostb' + TR9.src.port = 2222 + TR9.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR9) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch2': abbr(u'ch2', 'testhostb', 2222, 'Active'), + u'ch3': abbr(u'ch3', 'testhostb', 2222, 'Active'), + u'ch4': abbr(u'ch4', 'testhostc', 3333, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostc', 3333, 'Inactive'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')}) + + TR10 = mock_TR() + TR10.src.host = 'testhosta' + TR10.src.port = 1111 + TR10.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 4: ('ch4', 'stringin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR10) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch2': abbr(u'ch2', 'testhosta', 1111, 'Active'), + u'ch3': abbr(u'ch3', 'testhosta', 1111, 'Active'), + u'ch4': abbr(u'ch4', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + + @defer.inlineCallbacks + def test_no_CFS(self): + self.cfclient = mock_client() + self.cfclient.connected = False + self.cfstore.client = self.cfclient + rcon_thread = threading.Timer(2, self.simulate_reconnect) + rcon_thread.start() + self.cfstore.startService() + TR1 = mock_TR() + TR1.src.host = 'testhosta' + TR1.src.port = 1111 + TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR1) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + + @defer.inlineCallbacks + def test_set_fail(self): + self.cfclient = mock_client() + self.cfclient.fail_set = True + self.cfstore.client = self.cfclient + self.cfstore.startService() + TR1 = mock_TR() + TR1.src.host = 'testhosta' + TR1.src.port = 1111 + TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} + rcon_thread = threading.Timer(2, self.simulate_reconnect) + rcon_thread.start() + deferred = yield self.cfstore.commit(TR1) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + + @defer.inlineCallbacks + def test_clean_service(self): + self.cfclient = mock_client() + self.cfclient.fail_find = True + self.cfclient.cf = {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Active'), + u'ch5': abbr(u'ch5', 'testhostb', 2222, 'Active'), + u'ch6': abbr(u'ch6', 'testhostb', 2222, 'Active'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Active')} + self.cfstore.client = self.cfclient + rcon_thread = threading.Timer(2, self.simulate_reconnect) + rcon_thread.start() + self.cfstore.startService() + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhostb', 2222, 'Inactive'), + u'ch5': abbr(u'ch5', 'testhostb', 2222, 'Inactive'), + u'ch6': abbr(u'ch6', 'testhostb', 2222, 'Inactive'), + u'ch7': abbr(u'ch7', 'testhostb', 2222, 'Inactive')}) + TR1 = mock_TR() + TR1.src.host = 'testhosta' + TR1.src.port = 1111 + TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} + deferred = yield self.cfstore.commit(TR1) + self.assertDictEqual(self.cfstore.client.cf, + {u'ch1': abbr(u'ch1', 'testhosta', 1111, 'Active'), + u'ch5': abbr(u'ch5', 'testhosta', 1111, 'Active'), + u'ch6': abbr(u'ch6', 'testhosta', 1111, 'Active'), + u'ch7': abbr(u'ch7', 'testhosta', 1111, 'Active')}) + + def assertCommit(self, dict): + self.assertDictEqual(self.cfstore.client.cf, dict) + + def simulate_reconnect(self): + self.cfclient.connected = True # There is the potential for a race condition here + self.cfclient.fail_set = False # This would cause a connection failure and re-polling at a different point + self.cfclient.fail_find = False + + +def abbr(name, hostname, iocname, status): + return {u'owner': 'cf-update', + u'name': name, + u'properties': [ + {u'owner': 'cf-update', u'name': 'hostName', + u'value': hostname}, + {u'owner': 'cf-update', u'name': 'iocName', + u'value': iocname}, + {u'owner': 'cf-update', u'name': 'pvStatus', + u'value': status}, + {u'owner': 'cf-update', u'name': 'time', + u'value': '2016-08-18 12:33:09.953985'}]} + + +def getTime(): + return '2016-08-18 12:33:09.953985' + +# if __name__ == '__main__': +# unittest.main()