Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions ipsframework/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, services, config):
self.__start_time = 0.0
self.__sys_exit = None
self.__method_name = None
self.__call_id = 0
self.__args = None
for i in config.keys():
try:
Expand Down Expand Up @@ -122,7 +123,7 @@ def __run__(self):
msg = self.__invocation_q.get()
self.services.log('Received Message ')
sender_id = msg.sender_id
call_id = msg.call_id
self.__call_id = msg.call_id
self.__method_name = msg.target_method
self.__args = msg.args
keywords = msg.keywords
Expand All @@ -140,12 +141,12 @@ def __run__(self):
self.services.exception('Uncaught Exception in component method.')
response_msg = MethodResultMessage(self.component_id,
sender_id,
call_id,
self.call_id,
Message.FAILURE, e)
else:
response_msg = MethodResultMessage(self.component_id,
sender_id,
call_id,
self.call_id,
Message.SUCCESS, retval)
self.services.fwk_in_q.put(response_msg)

Expand All @@ -169,6 +170,10 @@ def start_time(self):
def method_name(self):
return self.__method_name

@property
def call_id(self):
return self.__call_id

@property
def args(self):
return self.__args
Expand Down
7 changes: 4 additions & 3 deletions ipsframework/ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,8 @@ def run(self):
start_time=start_time,
end_time=time.time(),
target=comp,
operation=f'{method}({arg})')
operation=f'{method}({arg})',
call_id=msg.call_id)
sim_msg_list = self.call_queue_map[msg.call_id]
del self.call_queue_map[msg.call_id]
if msg.status == Message.FAILURE:
Expand Down Expand Up @@ -581,7 +582,7 @@ def initiate_new_simulation(self, sim_name):
self.call_queue_map[call_id] = msg_list
self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time()

def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, target=None, operation=None, start_time=None, end_time=None):
def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, target=None, operation=None, start_time=None, end_time=None, call_id=0):
"""
Publish a portal monitor event to the *_IPS_MONITOR* event topic.
Event topics that start with an underscore are reserved for use by the
Expand Down Expand Up @@ -674,7 +675,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta
if target is not None:
trace['localEndpoint'] = {"serviceName": target}
trace['name'] = operation
trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16]
trace['id'] = hashlib.md5(f"{target}:{operation}:{call_id}".encode()).hexdigest()[:16]
trace["parentId"] = hashlib.md5(f'{sim_name}@{self.component_id}'.encode()).hexdigest()[:16]

if trace:
Expand Down
27 changes: 20 additions & 7 deletions ipsframework/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ def _send_monitor_event(self,
target=None,
operation=None,
procs_requested=None,
cores_allocated=None):
cores_allocated=None,
call_id=0):
"""
Construct and send an event populated with the component's
information, *eventType*, *comment*, *ok*, *state*, and a wall time
Expand Down Expand Up @@ -427,9 +428,10 @@ def _send_monitor_event(self,
trace['name'] = operation
formatted_args = ['%.3f' % (x) if isinstance(x, float)
else str(x) for x in self.component_ref.args]
trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16]
trace['parentId'] = hashlib.md5(f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)})"
.encode()).hexdigest()[:16]
trace['id'] = hashlib.md5(f"{target}:{operation}:{call_id}".encode()).hexdigest()[:16]
trace['parentId'] = hashlib.md5(
f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)}):{self.component_ref.call_id}"
.encode()).hexdigest()[:16]
trace['tags'] = {}
if procs_requested is not None:
trace['tags']['procs_requested'] = str(procs_requested)
Expand Down Expand Up @@ -549,7 +551,8 @@ def wait_call(self, call_id, block=True):
end_time=time.time(),
elapsed_time=time.time()-start_time,
target=target,
operation=f'{method_name}({formatted_args})')
operation=f'{method_name}({formatted_args})',
call_id=call_id)
except Exception as e:
self._send_monitor_event('IPS_CALL_END',
f'Error: "{e}" Target = {target_full}',
Expand All @@ -558,6 +561,7 @@ def wait_call(self, call_id, block=True):
elapsed_time=time.time()-start_time,
target=target,
operation=f'{method_name}({formatted_args})',
call_id=call_id,
ok=False)
raise

Expand Down Expand Up @@ -929,7 +933,15 @@ def wait_task(self, task_id, timeout=-1, delay=1):
process.kill()
task_retval = process.wait()
self._send_monitor_event('IPS_TASK_END', 'task_id = %s TIMEOUT elapsed time = %.2f S' %
(str(task_id), finish_time - start_time))
(str(task_id), finish_time - start_time),
start_time=start_time,
end_time=finish_time,
elapsed_time=finish_time - start_time,
procs_requested=nproc,
cores_allocated=cores,
target=binary,
operation=" ".join(args),
call_id=task_id)
else:
self._send_monitor_event('IPS_TASK_END', 'task_id = %s elapsed time = %.2f S' %
(str(task_id), finish_time - start_time),
Expand All @@ -939,7 +951,8 @@ def wait_task(self, task_id, timeout=-1, delay=1):
procs_requested=nproc,
cores_allocated=cores,
target=binary,
operation=" ".join(args))
operation=" ".join(args),
call_id=task_id)

del self.task_map[task_id]
try:
Expand Down
9 changes: 9 additions & 0 deletions tests/components/drivers/driver_double_trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from ipsframework import Component


class driver(Component):
def step(self, timestamp=0.0, **keywords):
w = self.services.get_port('WORKER')
# call the same worker step twice to check that the trace is correct
self.services.call(w, 'step', 0)
self.services.call(w, 'step', 0)
14 changes: 14 additions & 0 deletions tests/components/workers/simple_sleep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# -------------------------------------------------------------------------------
# Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information.
# -------------------------------------------------------------------------------
from ipsframework import Component


class simple_sleep(Component):
def step(self, timestamp=0.0, **keywords):
self.services.wait_task(
self.services.launch_task(1,
"/tmp",
"/bin/sleep",
1)
)
4 changes: 2 additions & 2 deletions tests/helloworld/test_helloworld.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ def handle(self):
assert 'duration' in trace
assert 'timestamp' in trace
assert 'id' in trace
assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000)'.encode()).hexdigest()[:16]
assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000):7'.encode()).hexdigest()[:16]
assert 'traceId' in trace
assert trace['traceId'] == hashlib.md5(event['portal_runid'].encode()).hexdigest()
assert 'parentId' in trace
assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0)'.encode()).hexdigest()[:16]
assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0):5'.encode()).hexdigest()[:16]
assert 'localEndpoint' in trace
assert trace['localEndpoint']['serviceName'] == 'Hello_world_1@HelloWorker@2'
136 changes: 136 additions & 0 deletions tests/new/test_trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import glob
import json
import hashlib
from ipsframework import Framework


def write_basic_config_and_platform_files(tmpdir, timeout='', logfile='', errfile='', nproc=1, exe='/bin/sleep', value='', shifter=False):
platform_file = tmpdir.join('platform.conf')

platform = """MPIRUN = eval
NODE_DETECTION = manual
CORES_PER_NODE = 2
SOCKETS_PER_NODE = 1
NODE_ALLOCATION_MODE = shared
HOST =
SCRATCH =
"""

with open(platform_file, 'w') as f:
f.write(platform)

config_file = tmpdir.join('ips.config')

config = f"""RUN_COMMENT = trace testing
SIM_NAME = trace
LOG_FILE = {str(tmpdir)}/sim.log
LOG_LEVEL = INFO
SIM_ROOT = {str(tmpdir)}
SIMULATION_MODE = NORMAL
[PORTS]
NAMES = DRIVER WORKER
[[DRIVER]]
IMPLEMENTATION = DRIVER
[[WORKER]]
IMPLEMENTATION = WORKER
[DRIVER]
CLASS = DRIVER
SUB_CLASS =
NAME = driver
BIN_PATH =
NPROC = 1
INPUT_FILES =
OUTPUT_FILES =
SCRIPT =
MODULE = components.drivers.driver_double_trace
[WORKER]
CLASS = WORKER
SUB_CLASS =
NAME = simple_sleep
NPROC = 1
BIN_PATH =
INPUT_FILES =
OUTPUT_FILES =
SCRIPT =
MODULE = components.workers.simple_sleep
"""

with open(config_file, 'w') as f:
f.write(config)

return platform_file, config_file


def test_trace_info(tmpdir):
platform_file, config_file = write_basic_config_and_platform_files(tmpdir, value=1)

framework = Framework(config_file_list=[str(config_file)],
log_file_name=str(tmpdir.join('ips.log')),
platform_file_name=str(platform_file),
debug=None,
verbose_debug=None,
cmd_nodes=0,
cmd_ppn=0)

framework.run()

# check simulation_log, make sure it includes events from dask tasks
json_files = glob.glob(str(tmpdir.join("simulation_log").join("*.json")))
assert len(json_files) == 1
with open(json_files[0], 'r') as json_file:
lines = json_file.readlines()
lines = [json.loads(line.strip()) for line in lines]
assert len(lines) == 17

portal_runid = lines[0]['portal_runid']

traces = [e['trace'] for e in lines if "trace" in e]

assert len(traces) == 8

call_ids = [5, 1, 8, 2, 9, 7, 10, None]
service_names = ['trace@driver@1',
'/bin/sleep',
'trace@simple_sleep@2',
'/bin/sleep',
'trace@simple_sleep@2',
'trace@driver@1',
'trace@driver@1',
'trace@FRAMEWORK@Framework@0']
names = ['init(0)',
'1',
'step(0)',
'1',
'step(0)',
'step(0)',
'finalize(0)',
None]
tags = [None,
{"procs_requested": "1", "cores_allocated": "1"},
{},
{"procs_requested": "1", "cores_allocated": "1"},
{},
None,
None,
{'total_cores': '2'}]
parents = [7, 2, 5, 4, 5, 7, 7, None]

for n, trace in enumerate(traces):
assert isinstance(trace['timestamp'], int)
assert isinstance(trace['duration'], int)
assert trace['traceId'] == hashlib.md5(portal_runid.encode()).hexdigest()
assert trace['localEndpoint']['serviceName'] == service_names[n]
assert "id" in trace
assert trace.get('tags') == tags[n]

if names[n]:
assert trace['name'] == names[n]
assert trace['id'] == hashlib.md5(f"{trace['localEndpoint']['serviceName']}:{trace['name']}:{call_ids[n]}".encode()).hexdigest()[:16]
else:
assert trace['id'] == hashlib.md5(f"{trace['localEndpoint']['serviceName']}".encode()).hexdigest()[:16]

if parents[n]:
if names[parents[n]]:
assert trace['parentId'] == hashlib.md5(f"{service_names[parents[n]]}:{names[parents[n]]}:{call_ids[parents[n]]}".encode()).hexdigest()[:16]
else:
assert trace['parentId'] == hashlib.md5(f"{service_names[parents[n]]}".encode()).hexdigest()[:16]