diff --git a/.github/workflows/conda_env/environment_dask_2.30.0.yml b/.github/workflows/conda_env/environment_dask_2.30.0.yml index 84595d20..35322918 100644 --- a/.github/workflows/conda_env/environment_dask_2.30.0.yml +++ b/.github/workflows/conda_env/environment_dask_2.30.0.yml @@ -10,3 +10,4 @@ dependencies: - dask=2.30.0 - click<8 - coverage!=6.3 +- flask=2.0.3 diff --git a/.github/workflows/conda_env/environment_dask_2.5.2.yml b/.github/workflows/conda_env/environment_dask_2.5.2.yml index 3d07e1c5..875acdad 100644 --- a/.github/workflows/conda_env/environment_dask_2.5.2.yml +++ b/.github/workflows/conda_env/environment_dask_2.5.2.yml @@ -11,3 +11,4 @@ dependencies: - click<8 - msgpack-python<1 - coverage!=6.3 +- flask=2.0.3 diff --git a/.github/workflows/conda_env/environment_linux.yml b/.github/workflows/conda_env/environment_linux.yml index bbde8404..180a7df1 100644 --- a/.github/workflows/conda_env/environment_linux.yml +++ b/.github/workflows/conda_env/environment_linux.yml @@ -9,3 +9,4 @@ dependencies: - dask=2022.08.1 - dakota - coverage!=6.3 +- flask=2.2.2 diff --git a/.github/workflows/conda_env/environment_macos.yml b/.github/workflows/conda_env/environment_macos.yml index 07877bd5..5e7cf35b 100644 --- a/.github/workflows/conda_env/environment_macos.yml +++ b/.github/workflows/conda_env/environment_macos.yml @@ -7,3 +7,4 @@ dependencies: - psutil - dask=2022.08.1 - coverage!=6.3 +- flask=2.2.2 diff --git a/.github/workflows/conda_env/environment_static_analysis.yml b/.github/workflows/conda_env/environment_static_analysis.yml index bdaa7ac3..d9bdb19a 100644 --- a/.github/workflows/conda_env/environment_static_analysis.yml +++ b/.github/workflows/conda_env/environment_static_analysis.yml @@ -9,3 +9,4 @@ dependencies: - codespell=2.2.1 - dask=2022.08.1 - pytest +- flask=2.2.2 diff --git a/.github/workflows/workflows.yml b/.github/workflows/workflows.yml index b9f9a547..0cb07794 100644 --- a/.github/workflows/workflows.yml +++ b/.github/workflows/workflows.yml @@ -98,6 +98,7 @@ jobs: python3-pytest-cov python3-pytest-timeout python3-psutil + python3-flask - name: Install IPS (in develop mode) run: python -m pip install -e . - name: testing running IPS (--help) diff --git a/ipsframework/component.py b/ipsframework/component.py index fd548f98..efb5847d 100644 --- a/ipsframework/component.py +++ b/ipsframework/component.py @@ -121,7 +121,7 @@ def __run__(self): while True: msg = self.__invocation_q.get() - self.services.log('Received Message ') + self.services.debug('Received Message ') sender_id = msg.sender_id self.__call_id = msg.call_id self.__method_name = msg.target_method diff --git a/ipsframework/configurationManager.py b/ipsframework/configurationManager.py index 4499f49b..54cf82f6 100644 --- a/ipsframework/configurationManager.py +++ b/ipsframework/configurationManager.py @@ -377,7 +377,7 @@ def _initialize_fwk_components(self): portal_conf['DATA_FILES'] = '' portal_conf['OUTPUT_FILES'] = '' portal_conf['NPROC'] = 1 - portal_conf['LOG_LEVEL'] = 'WARNING' + portal_conf['LOG_LEVEL'] = 'INFO' try: portal_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER'] except KeyError: diff --git a/ipsframework/portalBridge.py b/ipsframework/portalBridge.py index 128b88b9..746a0505 100644 --- a/ipsframework/portalBridge.py +++ b/ipsframework/portalBridge.py @@ -6,7 +6,7 @@ import datetime import sys import os -from subprocess import Popen, PIPE +from multiprocessing import Process, Pipe, Event import time import inspect from collections import defaultdict @@ -14,10 +14,11 @@ import glob import itertools import json -import shutil +import urllib3 from ipsframework import ipsutil, Component from ipsframework.convert_log_function import convert_logdata_to_html + try: from mpo_arg import mpo_methods as mpo except ImportError: @@ -51,6 +52,33 @@ def hash_file(file_name): # pragma: no cover return hasher.hexdigest() +def send_post(conn, stop, url): + fail_count = 0 + + http = urllib3.PoolManager(retries=urllib3.util.Retry(3, backoff_factor=0.25), + headers={'Content-Type': 'application/json'}) + + while True: + if conn.poll(0.1): + msgs = [] + while conn.poll(0.01): + msgs.append(conn.recv()) + try: + resp = http.request("POST", url, body=json.dumps(msgs).encode()) + except urllib3.exceptions.MaxRetryError as e: + fail_count += 1 + conn.send((999, str(e))) + else: + conn.send((resp.status, resp.data.decode())) + fail_count = 0 + + if fail_count >= 3: + conn.send((-1, "Too many consecutive failed connections")) + break + elif stop.is_set(): + break + + class PortalBridge(Component): """ Framework component to communicate with the SWIM web portal. @@ -88,10 +116,10 @@ def __init__(self, services, config): self.done = False self.first_event = True self.childProcess = None + self.childProcessStop = None + self.parent_conn = None self.mpo = None self.mpo_name_counter = defaultdict(lambda: 0) - self.file_hash_cache = defaultdict(dict) - self.file_uid_cache = defaultdict(dict) self.counter = 0 self.dump_freq = 10 self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation @@ -208,8 +236,9 @@ def process_event(self, topicName, theEvent): if len(self.sim_map) == 0: if self.childProcess: - self.childProcess.stdin.close() - self.childProcess.wait() + self.childProcessStop.set() + self.childProcess.join() + self.check_send_post_responses() self.done = True self.services.debug('No more simulation to monitor - exiting') time.sleep(1) @@ -248,25 +277,49 @@ def send_event(self, sim_data, event_data): except Exception: self.services.exception("Error writing html file into USER_W3_DIR directory") self.write_to_htmldir = False + if self.portal_url: - webmsg = json.dumps(event_data) + if self.first_event: # First time, launch sendPost.py daemon + self.parent_conn, child_conn = Pipe() + self.childProcessStop = Event() + self.childProcess = Process(target=send_post, args=(child_conn, self.childProcessStop, self.portal_url)) + self.childProcess.start() + self.first_event = False + try: - if self.first_event: # First time, launch sendPost.py daemon - cmd = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'sendPost.py') - python_exec = shutil.which('python3') - self.childProcess = Popen([python_exec, cmd], bufsize=128, - stdin=PIPE, stdout=PIPE, - stderr=PIPE, close_fds=True) - self.first_event = False - self.childProcess.stdin.write(('%s %s\n' % - (self.portal_url, webmsg)).encode()) - self.childProcess.stdin.flush() - except Exception as e: - self.services.exception('Error transmitting event number %6d to %s : %s', - sim_data.counter, self.portal_url, str(e)) + self.parent_conn.send(event_data) + except OSError: + pass + + self.check_send_post_responses() + if sim_data.mpo_wid: self.send_mpo_data(event_data, sim_data) + def check_send_post_responses(self): + while self.parent_conn.poll(): + try: + code, msg = self.parent_conn.recv() + except (EOFError, OSError): + break + + try: + data = json.loads(msg) + if "runid" in data: + self.services.info("Run Portal URL = %s/%s", self.portal_url, data.get('runid')) + + msg = json.dumps(data) + except (TypeError, json.decoder.JSONDecodeError): + pass + if code == 200: + self.services.debug("Portal Response: %d %s", code, msg) + elif code == -1: + # disable portal, stop trying to send more data + self.portal_url = None + self.services.error("Disabling portal because: %s", msg) + else: + self.services.error("Portal Error: %d %s", code, msg) + def send_mpo_data(self, event_data, sim_data): # pragma: no cover def md5(fname): "Courtesy of stackoverflow 3431825" diff --git a/ipsframework/sendPost.py b/ipsframework/sendPost.py deleted file mode 100644 index 5eef2bac..00000000 --- a/ipsframework/sendPost.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python3 -# ------------------------------------------------------------------------------- -# Copyright 2006-2021 UT-Battelle, LLC. See LICENSE for more information. -# ------------------------------------------------------------------------------- - -import sys -from urllib import request, error -import socket -import time -import traceback - -headers = {'Content-Type': 'application/json'} - - -def sendEncodedMessage(url, msg): - if not isinstance(msg, bytes): - msg = msg.encode('utf-8') - - num_trials = 2 - trial = 0 - delay = [0.4, 0.8, 1.2] - - while trial < num_trials: - try: - req = request.Request(url, data=msg, headers=headers, method='POST') - resp = request.urlopen(req) - except error.URLError: - trial += 1 - if trial > num_trials: - open('PORTAL.err', 'a').write('%s\n' % (msg)) - else: - time.sleep(delay[trial-1]) - else: - break - try: - resp.close() - except Exception: - pass - - -if __name__ == "__main__": - """ Loop over input from stdin, expecting lines of the format: - URL ENCODED_WEB_MSG - """ - timeout = 3 - socket.setdefaulttimeout(timeout) - error_f = open("sendpost.err", 'w') - line = ' ' - while True: - try: - line = sys.stdin.readline().rstrip('\n') - if line == '': - break - tokens = line.split(' ', 1) - sendEncodedMessage(tokens[0], tokens[1]) - except Exception: - traceback.print_exc(file=error_f) - sys.exit(0) diff --git a/setup.py b/setup.py index 1150cc0f..384dd71d 100644 --- a/setup.py +++ b/setup.py @@ -60,5 +60,8 @@ "Operating System :: POSIX :: Linux", ], python_requires='>=3.6', - zip_safe=True + zip_safe=True, + install_requires=[ + 'urllib3' + ] ) diff --git a/tests/components/drivers/simple_driver.py b/tests/components/drivers/simple_driver.py new file mode 100644 index 00000000..f59e3fde --- /dev/null +++ b/tests/components/drivers/simple_driver.py @@ -0,0 +1,7 @@ +from ipsframework import Component + + +class driver(Component): + def step(self, timestamp=0.0, **keywords): + w = self.services.get_port('WORKER') + self.services.call(w, 'step', 0) diff --git a/tests/components/workers/simple_sleep.py b/tests/components/workers/simple_sleep.py index 97a357c6..9369b9fa 100644 --- a/tests/components/workers/simple_sleep.py +++ b/tests/components/workers/simple_sleep.py @@ -1,14 +1,17 @@ # ------------------------------------------------------------------------------- # Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information. # ------------------------------------------------------------------------------- +import time from ipsframework import Component class simple_sleep(Component): def step(self, timestamp=0.0, **keywords): + time.sleep(1) self.services.wait_task( self.services.launch_task(1, "/tmp", "/bin/sleep", 1) ) + time.sleep(1) diff --git a/tests/helloworld/test_helloworld.py b/tests/helloworld/test_helloworld.py index e3cee99f..058c9d7f 100644 --- a/tests/helloworld/test_helloworld.py +++ b/tests/helloworld/test_helloworld.py @@ -1,10 +1,6 @@ import os -import sys import shutil import json -import glob -import socketserver -import hashlib import pytest from ipsframework import Framework, TaskPool @@ -261,120 +257,3 @@ def test_helloworld_task_pool_dask(tmpdir, capfd): for task in ["bin", "meth", "func"]: assert f'{task}_{n}' in exit_status assert exit_status[f'{task}_{n}'] == 0 - - -@pytest.mark.skipif(sys.platform == 'darwin', reason="This doesn't work with macOS") -@pytest.mark.timeout(120) -def test_helloworld_portal(tmpdir, capfd): - data_dir = os.path.dirname(__file__) - copy_config_and_replace(os.path.join(data_dir, "hello_world.ips"), tmpdir.join("hello_world.ips"), tmpdir, portal=True) - shutil.copy(os.path.join(data_dir, "platform.conf"), tmpdir) - shutil.copy(os.path.join(data_dir, "hello_driver.py"), tmpdir) - shutil.copy(os.path.join(data_dir, "hello_worker.py"), tmpdir) - - # standup simple socketserver to capture data from sendPost.py - - data = [] - - class TCPHandler(socketserver.BaseRequestHandler): - def handle(self): - data.append(self.request.recv(1024).strip().decode()) - - with socketserver.TCPServer(("localhost", 8080), TCPHandler) as server: - server.timeout = 1 - - framework = Framework(config_file_list=[os.path.join(tmpdir, "hello_world.ips")], - log_file_name=str(tmpdir.join('test.log')), - platform_file_name=os.path.join(tmpdir, "platform.conf"), - debug=None, - verbose_debug=None, - cmd_nodes=0, - cmd_ppn=0) - - assert framework.log_file_name.endswith('test.log') - - fwk_components = framework.config_manager.get_framework_components() - assert len(fwk_components) == 2 - assert 'Hello_world_1_FWK@runspaceInitComponent@3' in fwk_components - assert 'Hello_world_1_FWK@PortalBridge@4' in fwk_components - - component_map = framework.config_manager.get_component_map() - - assert len(component_map) == 1 - assert 'Hello_world_1' in component_map - hello_world_1 = component_map['Hello_world_1'] - assert len(hello_world_1) == 1 - assert hello_world_1[0].get_class_name() == 'HelloDriver' - assert hello_world_1[0].get_instance_name().startswith('Hello_world_1@HelloDriver') - assert hello_world_1[0].get_seq_num() == 1 - assert hello_world_1[0].get_serialization().startswith('Hello_world_1@HelloDriver') - assert hello_world_1[0].get_sim_name() == 'Hello_world_1' - - framework.run() - - # just get the first 5 events - for _ in range(5): - server.handle_request() - - captured = capfd.readouterr() - - captured_out = captured.out.split('\n') - assert captured_out[0].startswith("Starting IPS") - assert captured_out[1] == "Created " - assert captured_out[2] == "Created " - assert captured_out[3] == 'HelloDriver: init' - assert captured_out[4] == 'HelloDriver: finished worker init call' - assert captured_out[5] == 'HelloDriver: beginning step call' - assert captured_out[6] == 'Hello from HelloWorker' - assert captured_out[7] == 'HelloDriver: finished worker call' - assert captured.err == '' - - # check that portal created output folders - assert os.path.exists(tmpdir.join("simulation_log")) - assert os.path.exists(tmpdir.join("www")) - - # check output files exist - www_files = glob.glob(str(tmpdir.join("www").join("*"))) - assert len(www_files) == 1 - assert os.path.basename(www_files[0]).startswith("Hello_world_1_") - assert www_files[0].endswith(".html") - files = glob.glob(str(tmpdir.join("simulation_log").join("*"))) - assert len(files) == 3 - exts = [os.path.splitext(f)[1] for f in files] - assert '.json' in exts - assert '.html' in exts - assert '.eventlog' in exts - - # check data sent to portal - assert len(data) == 5 - # get first event to check - event = json.loads(data[0].split('\r\n')[-1]) - assert event['code'] == 'Framework' - assert event['eventtype'] == 'IPS_START' - assert event['comment'] == 'Starting IPS Simulation' - assert event['state'] == 'Running' - assert event['sim_name'] == 'Hello_world_1' - assert event['seqnum'] == 0 - assert 'ips_version' in event - assert 'time' in event - - # get last event to check - event = json.loads(data[-1].split('\r\n')[-1]) - assert event['code'] == 'DRIVERS_HELLO_HelloDriver' - assert event['eventtype'] == 'IPS_CALL_END' - assert event['comment'] == 'Target = Hello_world_1@HelloWorker@2:init(0.000)' - assert event['state'] == 'Running' - assert event['sim_name'] == 'Hello_world_1' - assert 'time' in event - assert 'trace' in event - trace = event['trace'] - assert 'duration' in trace - assert 'timestamp' in trace - assert 'id' in trace - assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000):7'.encode()).hexdigest()[:16] - assert 'traceId' in trace - assert trace['traceId'] == hashlib.md5(event['portal_runid'].encode()).hexdigest() - assert 'parentId' in trace - assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0):5'.encode()).hexdigest()[:16] - assert 'localEndpoint' in trace - assert trace['localEndpoint']['serviceName'] == 'Hello_world_1@HelloWorker@2' diff --git a/tests/new/test_portal.py b/tests/new/test_portal.py new file mode 100644 index 00000000..ef8eabc3 --- /dev/null +++ b/tests/new/test_portal.py @@ -0,0 +1,188 @@ +import json +import hashlib +import sys +from multiprocessing import Process, set_start_method +import pytest +from ipsframework import Framework + +# Try using fork for starting subprocesses, this is the default on +# Linux but not macOS with python >= 3.8 +if sys.platform == 'darwin': + try: + set_start_method('fork') + except RuntimeError: + # context can only be set once + pass + + +def write_basic_config_and_platform_files(tmpdir): + platform_file = tmpdir.join('platform.conf') + + platform = """MPIRUN = eval +NODE_DETECTION = manual +CORES_PER_NODE = 2 +SOCKETS_PER_NODE = 1 +NODE_ALLOCATION_MODE = shared +HOST = +SCRATCH = +""" + + with open(platform_file, 'w') as f: + f.write(platform) + + config_file = tmpdir.join('ips.config') + + config = f"""RUN_COMMENT = portal testing +SIM_NAME = portal_test +LOG_FILE = {str(tmpdir)}/sim.log +LOG_LEVEL = INFO +SIM_ROOT = {str(tmpdir)} +SIMULATION_MODE = NORMAL +USE_PORTAL = True +PORTAL_URL = http://localhost:18080 +[PORTS] + NAMES = DRIVER WORKER + [[DRIVER]] + IMPLEMENTATION = DRIVER + [[WORKER]] + IMPLEMENTATION = WORKER +[DRIVER] + CLASS = DRIVER + SUB_CLASS = + NAME = driver + BIN_PATH = + NPROC = 1 + INPUT_FILES = + OUTPUT_FILES = + SCRIPT = + MODULE = components.drivers.simple_driver +[WORKER] + CLASS = WORKER + SUB_CLASS = + NAME = simple_sleep + NPROC = 1 + BIN_PATH = + INPUT_FILES = + OUTPUT_FILES = + SCRIPT = + MODULE = components.workers.simple_sleep +""" + + with open(config_file, 'w') as f: + f.write(config) + + return platform_file, config_file + + +def test_portal(tmpdir): + pytest.importorskip("flask") + from flask import Flask, jsonify, request # pylint: disable=import-outside-toplevel + platform_file, config_file = write_basic_config_and_platform_files(tmpdir) + + # standup simple flask server to test send_post + def flask_server(): + app = Flask("IPS portal") + + @app.route("/", methods=["POST"]) + def api(): + data = request.get_json() + return jsonify(message="Events added to run", events=len(data), runid=42, event=data), 200 + + app.run(port=18080) + + p = Process(target=flask_server) + p.start() + framework = Framework(config_file_list=[str(config_file)], + log_file_name=str(tmpdir.join('ips.log')), + platform_file_name=str(platform_file), + debug=True, + verbose_debug=None, + cmd_nodes=0, + cmd_ppn=0) + + framework.run() + + p.terminate() + + with open(str(tmpdir.join('ips.log')), 'r') as f: + lines = f.readlines() + + URLs = [line[57:] for line in lines if "FWK_COMP_PortalBridge_4 INFO" in line] + assert len(URLs) > 0 + assert URLs[0] == "Run Portal URL = http://localhost:18080/42\n" + + # remove timestamp and common start + lines = [(int(code), json.loads(data)) for (code, data) in + [line[74:].strip().split(maxsplit=1) for line in lines if "FWK_COMP_PortalBridge_4 DEBUG Portal Response: " in line]] + + for code, _ in lines: + assert code == 200 + + # check number of events sent to portal + assert sum(data.get("events") for _, data in lines) == 13 + # get first event to check + data = lines[0][1] + assert data['runid'] == 42 + assert data['message'] == 'Events added to run' + + event = data['event'][0] + assert event['code'] == 'Framework' + assert event['eventtype'] == 'IPS_START' + assert event['comment'] == 'Starting IPS Simulation' + assert event['state'] == 'Running' + assert event['sim_name'] == 'portal_test' + assert event['seqnum'] == 0 + assert 'ips_version' in event + assert 'time' in event + + # get last event to check + data = lines[-1][1] + assert data['runid'] == 42 + assert data['message'] == 'Events added to run' + + event = data['event'][-1] + assert event['code'] == 'Framework' + assert event['eventtype'] == 'IPS_END' + assert event['comment'] == 'Simulation Ended' + assert event['state'] == 'Completed' + assert event['sim_name'] == 'portal_test' + assert event['seqnum'] == 12 + assert 'time' in event + assert 'trace' in event + trace = event['trace'] + assert 'duration' in trace + assert 'timestamp' in trace + assert 'id' in trace + assert trace['id'] == hashlib.md5('portal_test@FRAMEWORK@Framework@0'.encode()).hexdigest()[:16] + assert 'traceId' in trace + assert trace['traceId'] == hashlib.md5(event['portal_runid'].encode()).hexdigest() + assert 'parentId' not in trace + assert 'localEndpoint' in trace + assert trace['localEndpoint']['serviceName'] == 'portal_test@FRAMEWORK@Framework@0' + + +def test_portal_no_server(tmpdir): + platform_file, config_file = write_basic_config_and_platform_files(tmpdir) + + framework = Framework(config_file_list=[str(config_file)], + log_file_name=str(tmpdir.join('ips.log')), + platform_file_name=str(platform_file), + debug=None, + verbose_debug=None, + cmd_nodes=0, + cmd_ppn=0) + + framework.run() + + with open(str(tmpdir.join('ips.log')), 'r') as f: + lines = f.readlines() + + # remove timestamp and common start + lines = [line[57:] for line in lines if "FWK_COMP_PortalBridge_4 ERROR" in line] + + assert len(lines) == 4 + # should fail 3 time then disable the portal + for n in range(3): + assert lines[n].startswith("Portal Error: 999 HTTPConnectionPool") + + assert lines[-1] == "Disabling portal because: Too many consecutive failed connections\n"