From 646cdb771a0b09bcefafa6f895c6dc9084616d71 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Tue, 13 Sep 2022 15:01:12 -0400 Subject: [PATCH] Fix trace IDs when multiple calls to component method with same timestamp --- ipsframework/component.py | 11 +- ipsframework/ips.py | 7 +- ipsframework/services.py | 27 +++- .../components/drivers/driver_double_trace.py | 9 ++ tests/components/workers/simple_sleep.py | 14 ++ tests/helloworld/test_helloworld.py | 4 +- tests/new/test_trace.py | 136 ++++++++++++++++++ 7 files changed, 193 insertions(+), 15 deletions(-) create mode 100644 tests/components/drivers/driver_double_trace.py create mode 100644 tests/components/workers/simple_sleep.py create mode 100644 tests/new/test_trace.py diff --git a/ipsframework/component.py b/ipsframework/component.py index 3a1cf393..fd548f98 100644 --- a/ipsframework/component.py +++ b/ipsframework/component.py @@ -32,6 +32,7 @@ def __init__(self, services, config): self.__start_time = 0.0 self.__sys_exit = None self.__method_name = None + self.__call_id = 0 self.__args = None for i in config.keys(): try: @@ -122,7 +123,7 @@ def __run__(self): msg = self.__invocation_q.get() self.services.log('Received Message ') sender_id = msg.sender_id - call_id = msg.call_id + self.__call_id = msg.call_id self.__method_name = msg.target_method self.__args = msg.args keywords = msg.keywords @@ -140,12 +141,12 @@ def __run__(self): self.services.exception('Uncaught Exception in component method.') response_msg = MethodResultMessage(self.component_id, sender_id, - call_id, + self.call_id, Message.FAILURE, e) else: response_msg = MethodResultMessage(self.component_id, sender_id, - call_id, + self.call_id, Message.SUCCESS, retval) self.services.fwk_in_q.put(response_msg) @@ -169,6 +170,10 @@ def start_time(self): def method_name(self): return self.__method_name + @property + def call_id(self): + return self.__call_id + @property def args(self): return self.__args diff --git a/ipsframework/ips.py b/ipsframework/ips.py index 75ca5205..039d18c0 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -517,7 +517,8 @@ def run(self): start_time=start_time, end_time=time.time(), target=comp, - operation=f'{method}({arg})') + operation=f'{method}({arg})', + call_id=msg.call_id) sim_msg_list = self.call_queue_map[msg.call_id] del self.call_queue_map[msg.call_id] if msg.status == Message.FAILURE: @@ -581,7 +582,7 @@ def initiate_new_simulation(self, sim_name): self.call_queue_map[call_id] = msg_list self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time() - def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, target=None, operation=None, start_time=None, end_time=None): + def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, target=None, operation=None, start_time=None, end_time=None, call_id=0): """ Publish a portal monitor event to the *_IPS_MONITOR* event topic. Event topics that start with an underscore are reserved for use by the @@ -674,7 +675,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta if target is not None: trace['localEndpoint'] = {"serviceName": target} trace['name'] = operation - trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16] + trace['id'] = hashlib.md5(f"{target}:{operation}:{call_id}".encode()).hexdigest()[:16] trace["parentId"] = hashlib.md5(f'{sim_name}@{self.component_id}'.encode()).hexdigest()[:16] if trace: diff --git a/ipsframework/services.py b/ipsframework/services.py index b923d413..711061a1 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -399,7 +399,8 @@ def _send_monitor_event(self, target=None, operation=None, procs_requested=None, - cores_allocated=None): + cores_allocated=None, + call_id=0): """ Construct and send an event populated with the component's information, *eventType*, *comment*, *ok*, *state*, and a wall time @@ -427,9 +428,10 @@ def _send_monitor_event(self, trace['name'] = operation formatted_args = ['%.3f' % (x) if isinstance(x, float) else str(x) for x in self.component_ref.args] - trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16] - trace['parentId'] = hashlib.md5(f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)})" - .encode()).hexdigest()[:16] + trace['id'] = hashlib.md5(f"{target}:{operation}:{call_id}".encode()).hexdigest()[:16] + trace['parentId'] = hashlib.md5( + f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)}):{self.component_ref.call_id}" + .encode()).hexdigest()[:16] trace['tags'] = {} if procs_requested is not None: trace['tags']['procs_requested'] = str(procs_requested) @@ -549,7 +551,8 @@ def wait_call(self, call_id, block=True): end_time=time.time(), elapsed_time=time.time()-start_time, target=target, - operation=f'{method_name}({formatted_args})') + operation=f'{method_name}({formatted_args})', + call_id=call_id) except Exception as e: self._send_monitor_event('IPS_CALL_END', f'Error: "{e}" Target = {target_full}', @@ -558,6 +561,7 @@ def wait_call(self, call_id, block=True): elapsed_time=time.time()-start_time, target=target, operation=f'{method_name}({formatted_args})', + call_id=call_id, ok=False) raise @@ -929,7 +933,15 @@ def wait_task(self, task_id, timeout=-1, delay=1): process.kill() task_retval = process.wait() self._send_monitor_event('IPS_TASK_END', 'task_id = %s TIMEOUT elapsed time = %.2f S' % - (str(task_id), finish_time - start_time)) + (str(task_id), finish_time - start_time), + start_time=start_time, + end_time=finish_time, + elapsed_time=finish_time - start_time, + procs_requested=nproc, + cores_allocated=cores, + target=binary, + operation=" ".join(args), + call_id=task_id) else: self._send_monitor_event('IPS_TASK_END', 'task_id = %s elapsed time = %.2f S' % (str(task_id), finish_time - start_time), @@ -939,7 +951,8 @@ def wait_task(self, task_id, timeout=-1, delay=1): procs_requested=nproc, cores_allocated=cores, target=binary, - operation=" ".join(args)) + operation=" ".join(args), + call_id=task_id) del self.task_map[task_id] try: diff --git a/tests/components/drivers/driver_double_trace.py b/tests/components/drivers/driver_double_trace.py new file mode 100644 index 00000000..4c13a4e1 --- /dev/null +++ b/tests/components/drivers/driver_double_trace.py @@ -0,0 +1,9 @@ +from ipsframework import Component + + +class driver(Component): + def step(self, timestamp=0.0, **keywords): + w = self.services.get_port('WORKER') + # call the same worker step twice to check that the trace is correct + self.services.call(w, 'step', 0) + self.services.call(w, 'step', 0) diff --git a/tests/components/workers/simple_sleep.py b/tests/components/workers/simple_sleep.py new file mode 100644 index 00000000..97a357c6 --- /dev/null +++ b/tests/components/workers/simple_sleep.py @@ -0,0 +1,14 @@ +# ------------------------------------------------------------------------------- +# Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information. +# ------------------------------------------------------------------------------- +from ipsframework import Component + + +class simple_sleep(Component): + def step(self, timestamp=0.0, **keywords): + self.services.wait_task( + self.services.launch_task(1, + "/tmp", + "/bin/sleep", + 1) + ) diff --git a/tests/helloworld/test_helloworld.py b/tests/helloworld/test_helloworld.py index b29408c7..e3cee99f 100644 --- a/tests/helloworld/test_helloworld.py +++ b/tests/helloworld/test_helloworld.py @@ -371,10 +371,10 @@ def handle(self): assert 'duration' in trace assert 'timestamp' in trace assert 'id' in trace - assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000)'.encode()).hexdigest()[:16] + assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000):7'.encode()).hexdigest()[:16] assert 'traceId' in trace assert trace['traceId'] == hashlib.md5(event['portal_runid'].encode()).hexdigest() assert 'parentId' in trace - assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0)'.encode()).hexdigest()[:16] + assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0):5'.encode()).hexdigest()[:16] assert 'localEndpoint' in trace assert trace['localEndpoint']['serviceName'] == 'Hello_world_1@HelloWorker@2' diff --git a/tests/new/test_trace.py b/tests/new/test_trace.py new file mode 100644 index 00000000..d30e3b19 --- /dev/null +++ b/tests/new/test_trace.py @@ -0,0 +1,136 @@ +import glob +import json +import hashlib +from ipsframework import Framework + + +def write_basic_config_and_platform_files(tmpdir, timeout='', logfile='', errfile='', nproc=1, exe='/bin/sleep', value='', shifter=False): + platform_file = tmpdir.join('platform.conf') + + platform = """MPIRUN = eval +NODE_DETECTION = manual +CORES_PER_NODE = 2 +SOCKETS_PER_NODE = 1 +NODE_ALLOCATION_MODE = shared +HOST = +SCRATCH = +""" + + with open(platform_file, 'w') as f: + f.write(platform) + + config_file = tmpdir.join('ips.config') + + config = f"""RUN_COMMENT = trace testing +SIM_NAME = trace +LOG_FILE = {str(tmpdir)}/sim.log +LOG_LEVEL = INFO +SIM_ROOT = {str(tmpdir)} +SIMULATION_MODE = NORMAL +[PORTS] + NAMES = DRIVER WORKER + [[DRIVER]] + IMPLEMENTATION = DRIVER + [[WORKER]] + IMPLEMENTATION = WORKER +[DRIVER] + CLASS = DRIVER + SUB_CLASS = + NAME = driver + BIN_PATH = + NPROC = 1 + INPUT_FILES = + OUTPUT_FILES = + SCRIPT = + MODULE = components.drivers.driver_double_trace +[WORKER] + CLASS = WORKER + SUB_CLASS = + NAME = simple_sleep + NPROC = 1 + BIN_PATH = + INPUT_FILES = + OUTPUT_FILES = + SCRIPT = + MODULE = components.workers.simple_sleep +""" + + with open(config_file, 'w') as f: + f.write(config) + + return platform_file, config_file + + +def test_trace_info(tmpdir): + platform_file, config_file = write_basic_config_and_platform_files(tmpdir, value=1) + + framework = Framework(config_file_list=[str(config_file)], + log_file_name=str(tmpdir.join('ips.log')), + platform_file_name=str(platform_file), + debug=None, + verbose_debug=None, + cmd_nodes=0, + cmd_ppn=0) + + framework.run() + + # check simulation_log, make sure it includes events from dask tasks + json_files = glob.glob(str(tmpdir.join("simulation_log").join("*.json"))) + assert len(json_files) == 1 + with open(json_files[0], 'r') as json_file: + lines = json_file.readlines() + lines = [json.loads(line.strip()) for line in lines] + assert len(lines) == 17 + + portal_runid = lines[0]['portal_runid'] + + traces = [e['trace'] for e in lines if "trace" in e] + + assert len(traces) == 8 + + call_ids = [5, 1, 8, 2, 9, 7, 10, None] + service_names = ['trace@driver@1', + '/bin/sleep', + 'trace@simple_sleep@2', + '/bin/sleep', + 'trace@simple_sleep@2', + 'trace@driver@1', + 'trace@driver@1', + 'trace@FRAMEWORK@Framework@0'] + names = ['init(0)', + '1', + 'step(0)', + '1', + 'step(0)', + 'step(0)', + 'finalize(0)', + None] + tags = [None, + {"procs_requested": "1", "cores_allocated": "1"}, + {}, + {"procs_requested": "1", "cores_allocated": "1"}, + {}, + None, + None, + {'total_cores': '2'}] + parents = [7, 2, 5, 4, 5, 7, 7, None] + + for n, trace in enumerate(traces): + assert isinstance(trace['timestamp'], int) + assert isinstance(trace['duration'], int) + assert trace['traceId'] == hashlib.md5(portal_runid.encode()).hexdigest() + assert trace['localEndpoint']['serviceName'] == service_names[n] + assert "id" in trace + assert trace.get('tags') == tags[n] + + if names[n]: + assert trace['name'] == names[n] + assert trace['id'] == hashlib.md5(f"{trace['localEndpoint']['serviceName']}:{trace['name']}:{call_ids[n]}".encode()).hexdigest()[:16] + else: + assert trace['id'] == hashlib.md5(f"{trace['localEndpoint']['serviceName']}".encode()).hexdigest()[:16] + + if parents[n]: + if names[parents[n]]: + assert trace['parentId'] == hashlib.md5(f"{service_names[parents[n]]}:{names[parents[n]]}:{call_ids[parents[n]]}".encode()).hexdigest()[:16] + else: + assert trace['parentId'] == hashlib.md5(f"{service_names[parents[n]]}".encode()).hexdigest()[:16]