Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/conda_env/environment_dask_2.30.0.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dependencies:
- dask=2.30.0
- click<8
- coverage!=6.3
- flask=2.0.3
1 change: 1 addition & 0 deletions .github/workflows/conda_env/environment_dask_2.5.2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ dependencies:
- click<8
- msgpack-python<1
- coverage!=6.3
- flask=2.0.3
1 change: 1 addition & 0 deletions .github/workflows/conda_env/environment_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ dependencies:
- dask=2022.08.1
- dakota
- coverage!=6.3
- flask=2.2.2
1 change: 1 addition & 0 deletions .github/workflows/conda_env/environment_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ dependencies:
- psutil
- dask=2022.08.1
- coverage!=6.3
- flask=2.2.2
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ dependencies:
- codespell=2.2.1
- dask=2022.08.1
- pytest
- flask=2.2.2
1 change: 1 addition & 0 deletions .github/workflows/workflows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ jobs:
python3-pytest-cov
python3-pytest-timeout
python3-psutil
python3-flask
- name: Install IPS (in develop mode)
run: python -m pip install -e .
- name: testing running IPS (--help)
Expand Down
2 changes: 1 addition & 1 deletion ipsframework/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __run__(self):

while True:
msg = self.__invocation_q.get()
self.services.log('Received Message ')
self.services.debug('Received Message ')
sender_id = msg.sender_id
self.__call_id = msg.call_id
self.__method_name = msg.target_method
Expand Down
2 changes: 1 addition & 1 deletion ipsframework/configurationManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def _initialize_fwk_components(self):
portal_conf['DATA_FILES'] = ''
portal_conf['OUTPUT_FILES'] = ''
portal_conf['NPROC'] = 1
portal_conf['LOG_LEVEL'] = 'WARNING'
portal_conf['LOG_LEVEL'] = 'INFO'
try:
portal_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER']
except KeyError:
Expand Down
93 changes: 73 additions & 20 deletions ipsframework/portalBridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
import datetime
import sys
import os
from subprocess import Popen, PIPE
from multiprocessing import Process, Pipe, Event
import time
import inspect
from collections import defaultdict
import hashlib
import glob
import itertools
import json
import shutil
import urllib3
from ipsframework import ipsutil, Component
from ipsframework.convert_log_function import convert_logdata_to_html


try:
from mpo_arg import mpo_methods as mpo
except ImportError:
Expand Down Expand Up @@ -51,6 +52,33 @@ def hash_file(file_name): # pragma: no cover
return hasher.hexdigest()


def send_post(conn, stop, url):
fail_count = 0

http = urllib3.PoolManager(retries=urllib3.util.Retry(3, backoff_factor=0.25),
headers={'Content-Type': 'application/json'})

while True:
if conn.poll(0.1):
msgs = []
while conn.poll(0.01):
msgs.append(conn.recv())
try:
resp = http.request("POST", url, body=json.dumps(msgs).encode())
except urllib3.exceptions.MaxRetryError as e:
fail_count += 1
conn.send((999, str(e)))
else:
conn.send((resp.status, resp.data.decode()))
fail_count = 0

if fail_count >= 3:
conn.send((-1, "Too many consecutive failed connections"))
break
elif stop.is_set():
break


class PortalBridge(Component):
"""
Framework component to communicate with the SWIM web portal.
Expand Down Expand Up @@ -88,10 +116,10 @@ def __init__(self, services, config):
self.done = False
self.first_event = True
self.childProcess = None
self.childProcessStop = None
self.parent_conn = None
self.mpo = None
self.mpo_name_counter = defaultdict(lambda: 0)
self.file_hash_cache = defaultdict(dict)
self.file_uid_cache = defaultdict(dict)
self.counter = 0
self.dump_freq = 10
self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation
Expand Down Expand Up @@ -208,8 +236,9 @@ def process_event(self, topicName, theEvent):

if len(self.sim_map) == 0:
if self.childProcess:
self.childProcess.stdin.close()
self.childProcess.wait()
self.childProcessStop.set()
self.childProcess.join()
self.check_send_post_responses()
self.done = True
self.services.debug('No more simulation to monitor - exiting')
time.sleep(1)
Expand Down Expand Up @@ -248,25 +277,49 @@ def send_event(self, sim_data, event_data):
except Exception:
self.services.exception("Error writing html file into USER_W3_DIR directory")
self.write_to_htmldir = False

if self.portal_url:
webmsg = json.dumps(event_data)
if self.first_event: # First time, launch sendPost.py daemon
self.parent_conn, child_conn = Pipe()
self.childProcessStop = Event()
self.childProcess = Process(target=send_post, args=(child_conn, self.childProcessStop, self.portal_url))
self.childProcess.start()
self.first_event = False

try:
if self.first_event: # First time, launch sendPost.py daemon
cmd = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'sendPost.py')
python_exec = shutil.which('python3')
self.childProcess = Popen([python_exec, cmd], bufsize=128,
stdin=PIPE, stdout=PIPE,
stderr=PIPE, close_fds=True)
self.first_event = False
self.childProcess.stdin.write(('%s %s\n' %
(self.portal_url, webmsg)).encode())
self.childProcess.stdin.flush()
except Exception as e:
self.services.exception('Error transmitting event number %6d to %s : %s',
sim_data.counter, self.portal_url, str(e))
self.parent_conn.send(event_data)
except OSError:
pass

self.check_send_post_responses()

if sim_data.mpo_wid:
self.send_mpo_data(event_data, sim_data)

def check_send_post_responses(self):
while self.parent_conn.poll():
try:
code, msg = self.parent_conn.recv()
except (EOFError, OSError):
break

try:
data = json.loads(msg)
if "runid" in data:
self.services.info("Run Portal URL = %s/%s", self.portal_url, data.get('runid'))

msg = json.dumps(data)
except (TypeError, json.decoder.JSONDecodeError):
pass
if code == 200:
self.services.debug("Portal Response: %d %s", code, msg)
elif code == -1:
# disable portal, stop trying to send more data
self.portal_url = None
self.services.error("Disabling portal because: %s", msg)
else:
self.services.error("Portal Error: %d %s", code, msg)

def send_mpo_data(self, event_data, sim_data): # pragma: no cover
def md5(fname):
"Courtesy of stackoverflow 3431825"
Expand Down
58 changes: 0 additions & 58 deletions ipsframework/sendPost.py

This file was deleted.

5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,8 @@
"Operating System :: POSIX :: Linux",
],
python_requires='>=3.6',
zip_safe=True
zip_safe=True,
install_requires=[
'urllib3'
]
)
7 changes: 7 additions & 0 deletions tests/components/drivers/simple_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from ipsframework import Component


class driver(Component):
def step(self, timestamp=0.0, **keywords):
w = self.services.get_port('WORKER')
self.services.call(w, 'step', 0)
3 changes: 3 additions & 0 deletions tests/components/workers/simple_sleep.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# -------------------------------------------------------------------------------
# Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information.
# -------------------------------------------------------------------------------
import time
from ipsframework import Component


class simple_sleep(Component):
def step(self, timestamp=0.0, **keywords):
time.sleep(1)
self.services.wait_task(
self.services.launch_task(1,
"/tmp",
"/bin/sleep",
1)
)
time.sleep(1)
Loading