From da3d0f989624741d792c6634f3f7a05eaf22489c Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 16 Dec 2021 15:23:11 -0500 Subject: [PATCH 01/17] Capture elapsed time in event log --- doc/examples/dask/simulation_log.json | 6 ++-- ipsframework/services.py | 47 ++++++++++++++++++--------- tests/new/test_dask.py | 4 +-- 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/doc/examples/dask/simulation_log.json b/doc/examples/dask/simulation_log.json index edb9c841..1d7b88fd 100644 --- a/doc/examples/dask/simulation_log.json +++ b/doc/examples/dask/simulation_log.json @@ -21,18 +21,18 @@ "code": "DASK_WORKER__DaskWorker", "eventtype": "IPS_TASK_END", "walltime": "2.83", - "comment": "task_name = method, elasped time = 0.50s", + "comment": "task_name = method, elapsed time = 0.50s", } { "code": "DASK_WORKER__DaskWorker", "eventtype": "IPS_TASK_END", "walltime": "2.83", - "comment": "task_name = function, elasped time = 0.50s", + "comment": "task_name = function, elapsed time = 0.50s", } { "code": "DASK_WORKER__DaskWorker", "eventtype": "IPS_TASK_END", "walltime": "2.85", "state": "Running", - "comment": "task_name = binary, elasped time = 0.52s", + "comment": "task_name = binary, elapsed time = 0.52s", } diff --git a/ipsframework/services.py b/ipsframework/services.py index 72d82e57..2a72f561 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -92,7 +92,8 @@ def launch(binary, task_name, working_dir, *args, **keywords): finish_time = time.time() with worker.lock: print(json.dumps({"event_type": "IPS_TASK_END", "event_time": finish_time, - "event_comment": f"task_name = {task_name}, elasped time = {finish_time - start_time:.2f}s"}), + "event_comment": f"task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s", + "elapsed_time": finish_time - start_time}), file=worker_event_log) except subprocess.TimeoutExpired: with worker.lock: @@ -110,7 +111,8 @@ def launch(binary, task_name, working_dir, *args, **keywords): finish_time = time.time() with worker.lock: print(json.dumps({"event_type": "IPS_TASK_END", "event_time": finish_time, - "event_comment": f"task_name = {task_name}, elasped time = {finish_time - start_time:.2f}s"}), + "event_comment": f"task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s", + "elapsed_time": finish_time - start_time}), file=worker_event_log) return task_name, ret_val @@ -384,7 +386,8 @@ def _send_monitor_event(self, comment='', ok='True', state='Running', - event_time=None): + event_time=None, + elapsed_time=None): """ Construct and send an event populated with the component's information, *eventType*, *comment*, *ok*, *state*, and a wall time @@ -399,6 +402,8 @@ def _send_monitor_event(self, if event_time is None: event_time = time.time() portal_data['walltime'] = '%.2f' % (event_time - self.component_ref.start_time) + if elapsed_time is not None: + portal_data['elapsed_time'] = elapsed_time portal_data['state'] = state portal_data['comment'] = comment if self.monitor_url: @@ -461,7 +466,7 @@ def call_nonblocking(self, component_id, method_name, *args, **keywords): 'init_call', method_name, *args, **keywords) call_id = self._get_service_response(msg_id, True) - self.call_targets[call_id] = (target, method_name, args) + self.call_targets[call_id] = (target, method_name, args, time.time()) return call_id def call(self, component_id, method_name, *args, **keywords): @@ -495,7 +500,7 @@ def wait_call(self, call_id, block=True): """ try: - (target, method_name, args) = self.call_targets[call_id] + (target, method_name, args, start_time) = self.call_targets[call_id] except KeyError: self.exception('Invalid call_id in wait-call() : %s', call_id) raise @@ -506,7 +511,8 @@ def wait_call(self, call_id, block=True): else str(x) for x in args] self._send_monitor_event('IPS_CALL_END', 'Target = ' + target + ':' + method_name + '(' + - str(*formatted_args) + ')') + str(*formatted_args) + ')', + elapsed_time=time.time()-start_time) del self.call_targets[call_id] return response @@ -857,14 +863,17 @@ def wait_task(self, task_id, timeout=-1, delay=1): time.sleep(delay) else: break + + finish_time = time.time(); if task_retval is None: process.kill() task_retval = process.wait() self._send_monitor_event('IPS_TASK_END', 'task_id = %s TIMEOUT elapsed time = %.2f S' % - (str(task_id), time.time() - start_time)) + (str(task_id), finish_time - start_time)) else: self._send_monitor_event('IPS_TASK_END', 'task_id = %s elapsed time = %.2f S' % - (str(task_id), time.time() - start_time)) + (str(task_id), finish_time - start_time), + elapsed_time=finish_time - start_time) del self.task_map[task_id] try: @@ -1272,7 +1281,8 @@ def stage_input_files(self, input_file_list): self._send_monitor_event(eventType='IPS_STAGE_INPUTS', comment='Elapsed time = %.3f Path = %s Files = %s' % (elapsed_time, os.path.abspath(inputDir), - str(input_file_list))) + str(input_file_list)), + elapsed_time=elapsed_time) def stage_subflow_output_files(self, subflow_name='ALL'): """Gather outputs from sub-workflows. Sub-workflow output is defined @@ -1458,7 +1468,8 @@ def stage_output_files(self, timeStamp, file_list, keep_old_files=True, save_pla elapsed_time = time.time() - start_time self._send_monitor_event('IPS_STAGE_OUTPUTS', 'Elapsed time = %.3f Path = %s Files = %s' % - (elapsed_time, output_dir, str(file_list))) + (elapsed_time, output_dir, str(file_list)), + elapsed_time=elapsed_time) def save_restart_files(self, timeStamp, file_list): """ @@ -1565,7 +1576,8 @@ def stage_state(self, state_files=None): elapsed_time = time.time() - start_time self._send_monitor_event('IPS_STAGE_STATE', 'Elapsed time = %.3f files = %s Success' % - (elapsed_time, ' '.join(files))) + (elapsed_time, ' '.join(files)), + elapsed_time=elapsed_time) def update_state(self, state_files=None): """ @@ -1600,7 +1612,8 @@ def update_state(self, state_files=None): elapsed_time = time.time() - start_time self._send_monitor_event('IPS_UPDATE_STATE', 'Elapsed time = %.3f files = %s Success' % - (elapsed_time, ' '.join(files))) + (elapsed_time, ' '.join(files)), + elapsed_time=elapsed_time) def merge_current_state(self, partial_state_file, logfile=None, merge_binary=None): """ @@ -1703,13 +1716,15 @@ def process_events(self): def send_portal_event(self, event_type="COMPONENT_EVENT", event_comment="", - event_time=None): + event_time=None, + elapsed_time=None): """ Send event to web portal. """ return self._send_monitor_event(eventType=event_type, comment=event_comment, - event_time=event_time) + event_time=event_time, + elapsed_time=elapsed_time) def log(self, msg, *args): """ @@ -1785,8 +1800,10 @@ def submit_tasks(self, task_pool_name, block=True, use_dask=False, dask_nodes=1, self._send_monitor_event('IPS_TASK_POOL_BEGIN', 'task_pool = %s ' % task_pool_name) task_pool: TaskPool = self.task_pools[task_pool_name] retval = task_pool.submit_tasks(block, use_dask, dask_nodes, dask_ppn, launch_interval, use_shifter) + elapsed_time = time.time() - start_time self._send_monitor_event('IPS_TASK_POOL_END', 'task_pool = %s elapsed time = %.2f S' % - (task_pool_name, time.time() - start_time)) + (task_pool_name, elapsed_time), + elapsed_time=elapsed_time) return retval def get_finished_tasks(self, task_pool_name): diff --git a/tests/new/test_dask.py b/tests/new/test_dask.py index e572565e..f7674815 100644 --- a/tests/new/test_dask.py +++ b/tests/new/test_dask.py @@ -119,7 +119,7 @@ def test_dask(tmpdir): task_end_comments = [e.get('comment')[:-4] for e in lines if e.get('eventtype') == "IPS_TASK_END"] for task in range(4): - assert f'task_name = task_{task}, elasped time = 1' in task_end_comments + assert f'task_name = task_{task}, elapsed time = 1' in task_end_comments @pytest.mark.skipif(shutil.which('shifter') is not None, @@ -222,7 +222,7 @@ def test_dask_fake_shifter(tmpdir, monkeypatch): task_end_comments = [e.get('comment')[:-4] for e in lines if e.get('eventtype') == "IPS_TASK_END"] for task in range(4): - assert f'task_name = task_{task}, elasped time = 1' in task_end_comments + assert f'task_name = task_{task}, elapsed time = 1' in task_end_comments # check shifter.log file with open(str(tmpdir.join('/work/DASK__dask_worker_2').join('shifter.log')), 'r') as f: From 78982f5645e61fefd443d9f7d60256325c9883e9 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 17 Dec 2021 08:52:26 -0500 Subject: [PATCH 02/17] Capture cores in event log --- ipsframework/resourceManager.py | 32 ++++++------ ipsframework/services.py | 29 ++++++++--- ipsframework/taskManager.py | 10 ++-- tests/new/test_taskManager.py | 89 +++++++++++++++++++-------------- 4 files changed, 94 insertions(+), 66 deletions(-) diff --git a/ipsframework/resourceManager.py b/ipsframework/resourceManager.py index 2ac26f62..7a95b855 100644 --- a/ipsframework/resourceManager.py +++ b/ipsframework/resourceManager.py @@ -356,7 +356,7 @@ def get_allocation(self, comp_id, nproc, task_id, else: try: self.processes += nproc - k = 0 + cores_allocated = 0 alloc_procs = 0 node_file_entries = [] if whole_nodes: @@ -371,10 +371,10 @@ def get_allocation(self, comp_id, nproc, task_id, self.avail_nodes.remove(n) self.alloc_nodes.append(n) node_file_entries.append((n, cores)) - k += procs - self.alloc_cores += k - self.avail_cores -= k - self.active_tasks.update({task_id: (comp_id, nproc, k)}) + cores_allocated += procs + self.alloc_cores += cores_allocated + self.avail_cores -= cores_allocated + self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)}) elif whole_socks: # ------------------------------- # whole sock allocation @@ -388,7 +388,7 @@ def get_allocation(self, comp_id, nproc, task_id, whole_socks, task_id, comp_id, to_alloc) - k += len(cores) + cores_allocated += len(cores) alloc_procs = min([ppn, len(cores)]) node_file_entries.append((n, cores)) if n not in self.alloc_nodes: @@ -396,9 +396,9 @@ def get_allocation(self, comp_id, nproc, task_id, if node.avail_cores - node.total_cores == 0: self.avail_nodes.remove(n) - self.alloc_cores += k - self.avail_cores -= k - self.active_tasks.update({task_id: (comp_id, nproc, k)}) + self.alloc_cores += cores_allocated + self.avail_cores -= cores_allocated + self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)}) else: # ------------------------------- # single core allocation @@ -407,22 +407,22 @@ def get_allocation(self, comp_id, nproc, task_id, node = self.nodes[n] if node.avail_cores > 0: to_alloc = min([ppn, node.avail_cores, - nproc - k]) + nproc - cores_allocated]) self.fwk.debug("allocate task_id %d node %s %d cores" % (task_id, n, to_alloc)) procs, cores = node.allocate(whole_nodes, whole_socks, task_id, comp_id, to_alloc) - k += procs + cores_allocated += procs node_file_entries.append((n, cores)) if n not in self.alloc_nodes: self.alloc_nodes.append(n) if node.avail_cores - node.total_cores == 0: self.avail_nodes.remove(n) - self.alloc_cores += k - self.avail_cores -= k - self.active_tasks.update({task_id: (comp_id, nproc, k)}) + self.alloc_cores += cores_allocated + self.avail_cores -= cores_allocated + self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)}) except Exception: print("Available Nodes:") for nm in self.avail_nodes: @@ -443,10 +443,10 @@ def get_allocation(self, comp_id, nproc, task_id, if whole_nodes: self.report_RM_status("allocation for task %d using whole nodes" % task_id) - return not whole_nodes, nodes, ppn, self.max_ppn, cpp, self.accurateNodes + return not whole_nodes, nodes, ppn, self.max_ppn, cpp, self.accurateNodes, cores_allocated else: self.report_RM_status("allocation for task %d using partial nodes" % task_id) - return not whole_nodes, nodes, node_file_entries, ppn, self.max_ppn, self.accurateNodes + return not whole_nodes, nodes, node_file_entries, ppn, self.max_ppn, self.accurateNodes, cores_allocated def check_whole_node_cap(self, nproc, ppn): """ diff --git a/ipsframework/services.py b/ipsframework/services.py index 2a72f561..a09e1902 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -387,7 +387,9 @@ def _send_monitor_event(self, ok='True', state='Running', event_time=None, - elapsed_time=None): + elapsed_time=None, + procs_requested=None, + cores_allocated=None): """ Construct and send an event populated with the component's information, *eventType*, *comment*, *ok*, *state*, and a wall time @@ -409,6 +411,11 @@ def _send_monitor_event(self, if self.monitor_url: portal_data['vizurl'] = self.monitor_url.split('//')[-1] + if procs_requested is not None: + portal_data['procs_requested'] = procs_requested + if cores_allocated is not None: + portal_data['cores_allocated'] = cores_allocated + event_data = {} event_data['sim_name'] = self.sim_conf['__PORTAL_SIM_NAME'] event_data['real_sim_name'] = self.sim_name @@ -632,16 +639,18 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): 'init_task', nproc, binary_fullpath, working_dir, task_ppn, block, whole_nodes, whole_socks, task_cpp, *args) - (task_id, command, env_update) = self._get_service_response(msg_id, block=True) + (task_id, command, env_update, cores_allocated) = self._get_service_response(msg_id, block=True) except Exception: raise task_id = self._launch_task(nproc, working_dir, task_id, command, env_update, tag, keywords) if env_update: - self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}, env = {env_update}') + self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}, env = {env_update}', + procs_requested=nproc, cores_allocated=cores_allocated) else: - self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}') + self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}', + procs_requested=nproc, cores_allocated=cores_allocated) return task_id @@ -738,7 +747,7 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0): if launch_interval > 0: time.sleep(launch_interval) task = queued_tasks[task_name] - (task_id, command, env_update) = allocated_tasks[task_name] + (task_id, command, env_update, cores_allocated) = allocated_tasks[task_name] tag = task.keywords.get('tag', 'None') active_tasks[task_name] = self._launch_task(task.nproc, task.working_dir, task_id, command, env_update, tag, task.keywords) @@ -746,10 +755,14 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0): if env_update: self._send_monitor_event('IPS_LAUNCH_TASK_POOL', f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}' - f', env = {env_update}') + f', env = {env_update}', + procs_requested=task.nproc, + cores_allocated=cores_allocated) else: self._send_monitor_event('IPS_LAUNCH_TASK_POOL', - f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}') + f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}', + procs_requested=task.nproc, + cores_allocated=cores_allocated) return active_tasks @@ -864,7 +877,7 @@ def wait_task(self, task_id, timeout=-1, delay=1): else: break - finish_time = time.time(); + finish_time = time.time() if task_retval is None: process.kill() task_retval = process.wait() diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 4a2001c5..2ef07713 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -260,9 +260,9 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, self.fwk.debug('RM: get_allocation() returned %s', str(retval)) partial_node = retval[0] if partial_node: - (nodelist, corelist, ppn, max_ppn, accurateNodes) = retval[1:] + (nodelist, corelist, ppn, max_ppn, accurateNodes, cores_allocated) = retval[1:] else: - (nodelist, ppn, max_ppn, cpp, accurateNodes) = retval[1:] + (nodelist, ppn, max_ppn, cpp, accurateNodes, cores_allocated) = retval[1:] if partial_node: nodes = ','.join(nodelist) @@ -292,7 +292,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, 'launch_cmd': cmd, 'env_update': env_update} - return (task_id, cmd, env_update) + return (task_id, cmd, env_update, cores_allocated) def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, max_ppn, nodes, accurateNodes, partial_nodes, @@ -557,14 +557,14 @@ def init_task_pool(self, init_task_msg): except BadResourceRequestException as e: self.fwk.error("There has been a fatal error, %s requested %d too many processors in task %d", caller_id, e.deficit, e.task_id) - for task_id, _, _ in ret_dict.values(): + for task_id, _, _, _ in ret_dict.values(): self.resource_mgr.release_allocation(task_id, -1) del self.curr_task_table[task_id] raise except ResourceRequestMismatchException as e: self.fwk.error("There has been a fatal error, %s requested too few processors per node to launch task %d (request: procs = %d, ppn = %d)", caller_id, e.task_id, e.nproc, e.ppn) - for task_id, _, _ in ret_dict.values(): + for task_id, _, _, _ in ret_dict.values(): self.resource_mgr.release_allocation(task_id, -1) del self.curr_task_table[task_id] raise diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index be9f0cfe..c77fc228 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -403,82 +403,87 @@ def test_init_task_srun(tmpdir): rm.accurateNodes = True def init_final_task(nproc, tppn, tcpt=0): - task_id, cmd, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + task_id, cmd, _, cores_allocated = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', nproc, 'exe', '/dir', tppn, True, True, True, tcpt)) tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', task_id, None)) - return task_id, cmd + return task_id, cmd, cores_allocated - task_id, cmd = init_final_task(1, 0) + task_id, cmd, cores = init_final_task(1, 0) assert task_id == 1 assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " + assert cores == 2 - task_id, cmd = init_final_task(2, 0) + task_id, cmd, cores = init_final_task(2, 0) assert task_id == 2 assert cmd == "srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe " + assert cores == 2 with pytest.raises(ResourceRequestUnequalPartitioningException): init_final_task(3, 0) - task_id, cmd = init_final_task(4, 0) + task_id, cmd, cores = init_final_task(4, 0) assert task_id == 4 assert cmd == "srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores exe " + assert cores == 4 with pytest.raises(BadResourceRequestException): init_final_task(5, 0) - task_id, cmd = init_final_task(1, 1) + task_id, cmd, cores = init_final_task(1, 1) assert task_id == 6 assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " + assert cores == 2 - task_id, cmd = init_final_task(2, 1) + task_id, cmd, cores = init_final_task(2, 1) assert task_id == 7 assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe " + assert cores == 4 with pytest.raises(ResourceRequestMismatchException): init_final_task(3, 1) fwk.reset_mock() - task_id, cmd = init_final_task(1, 1, 2) + task_id, cmd, cores = init_final_task(1, 1, 2) assert task_id == 9 assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() - task_id, cmd = init_final_task(1, 1, 1) + task_id, cmd, cores = init_final_task(1, 1, 1) assert task_id == 10 assert cmd == "srun -N 1 -n 1 -c 1 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() - task_id, cmd = init_final_task(1, 1, 4) + task_id, cmd, cores = init_final_task(1, 1, 4) assert task_id == 11 assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_called_once_with("task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead") fwk.reset_mock() - task_id, cmd = init_final_task(2, 1, 2) + task_id, cmd, cores = init_final_task(2, 1, 2) assert task_id == 12 assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() - task_id, cmd = init_final_task(2, 1, 1) + task_id, cmd, cores = init_final_task(2, 1, 1) assert task_id == 13 assert cmd == "srun -N 2 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() - task_id, cmd = init_final_task(2, 1, 12) + task_id, cmd, cores = init_final_task(2, 1, 12) assert task_id == 14 assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_called_once_with("task cpp (12) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead") # start two task, second should fail with Insufficient Resources depending on block - task_id, cmd, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', - 4, 'exe', '/dir', 0, True, - True, True, 0)) + task_id, cmd, _, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + 4, 'exe', '/dir', 0, True, + True, True, 0)) with pytest.raises(BlockedMessageException): tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', @@ -518,80 +523,88 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): if msg is None: msg = {f'task{n}': (nproc, '/dir', f'exe{n}', (f'arg{n}',), tppn, True, False, tcpp) for n in range(number_of_tasks)} retval = tm.init_task_pool(ServiceRequestMessage('id', 'id', 'c', 'init_task_pool', msg)) - for task_id, _, _ in retval.values(): + for task_id, _, _, _ in retval.values(): tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', task_id, None)) return retval retval = init_final_task_pool(1, 0, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 1 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 2 retval = init_final_task_pool(2, 0, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 2 assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 2 with pytest.raises(ResourceRequestUnequalPartitioningException): init_final_task_pool(3, 0, 1) retval = init_final_task_pool(4, 0, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 4 assert cmd == 'srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 4 with pytest.raises(BadResourceRequestException): init_final_task_pool(5, 0, 1) retval = init_final_task_pool(1, 1, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 6 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 2 retval = init_final_task_pool(2, 1, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 7 assert cmd == 'srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 4 with pytest.raises(ResourceRequestMismatchException): init_final_task_pool(3, 1, 1) retval = init_final_task_pool(1, 0, 2) assert len(retval) == 2 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 9 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' - task_id, cmd, _ = retval['task1'] - assert task_id == 10 - assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe1 arg1' + assert cores == 2 + task_id, cmd, _, cores = retval['task1'] + assert cores == 2 retval = init_final_task_pool(2, 0, 2) assert len(retval) == 2 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 11 assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' - task_id, cmd, _ = retval['task1'] + assert cores == 2 + task_id, cmd, _, cores = retval['task1'] assert task_id == 12 assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe1 arg1' + assert cores == 2 retval = init_final_task_pool(4, 0, 2) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 13 assert cmd == 'srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 4 # now try with task_cpp set fwk.reset_mock() retval = init_final_task_pool(1, 1, 1, 2) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 15 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() @@ -599,7 +612,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.reset_mock() retval = init_final_task_pool(1, 1, 1, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 16 assert cmd == 'srun -N 1 -n 1 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() @@ -607,7 +620,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.reset_mock() retval = init_final_task_pool(1, 1, 1, 4) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 17 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') @@ -615,7 +628,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.reset_mock() retval = init_final_task_pool(2, 1, 1, 2) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 18 assert cmd == 'srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() @@ -623,7 +636,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.reset_mock() retval = init_final_task_pool(2, 1, 1, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 19 assert cmd == 'srun -N 2 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() @@ -631,7 +644,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.reset_mock() retval = init_final_task_pool(2, 1, 1, 4) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 20 assert cmd == 'srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') @@ -641,12 +654,14 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): 'task1': (2, '/dir', 'exe1', ('arg1',), 0, True, False, 0)} retval = init_final_task_pool(msg=msg) assert len(retval) == 2 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 21 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' - task_id, cmd, _ = retval['task1'] + assert cores == 2 + task_id, cmd, _, cores = retval['task1'] assert task_id == 22 assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe1 arg1' + assert cores == 2 # one good task, one bad task msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0), From c88a6d20d2e0864bac6e22ca6d571ad1b9ca410b Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Wed, 22 Dec 2021 13:50:46 -0500 Subject: [PATCH 03/17] Capture task target --- ipsframework/services.py | 57 +++++++++++++++++++++++------------ tests/new/test_taskManager.py | 4 +-- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index a09e1902..1fc9ab93 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -388,6 +388,9 @@ def _send_monitor_event(self, state='Running', event_time=None, elapsed_time=None, + start_time=None, + end_time=None, + target=None, procs_requested=None, cores_allocated=None): """ @@ -406,15 +409,20 @@ def _send_monitor_event(self, portal_data['walltime'] = '%.2f' % (event_time - self.component_ref.start_time) if elapsed_time is not None: portal_data['elapsed_time'] = elapsed_time - portal_data['state'] = state - portal_data['comment'] = comment - if self.monitor_url: - portal_data['vizurl'] = self.monitor_url.split('//')[-1] - + if start_time is not None: + portal_data['start_time'] = start_time + if end_time is not None: + portal_data['end_time'] = end_time + if target is not None: + portal_data['target'] = target if procs_requested is not None: portal_data['procs_requested'] = procs_requested if cores_allocated is not None: portal_data['cores_allocated'] = cores_allocated + portal_data['state'] = state + portal_data['comment'] = comment + if self.monitor_url: + portal_data['vizurl'] = self.monitor_url.split('//')[-1] event_data = {} event_data['sim_name'] = self.sim_conf['__PORTAL_SIM_NAME'] @@ -440,7 +448,7 @@ def cleanup(self): method in the base class for components. """ - for (p, _, _) in self.task_map.values(): + for p, *_ in self.task_map.values(): try: p.kill() except Exception: @@ -514,12 +522,14 @@ def wait_call(self, call_id, block=True): msg_id = self._invoke_service(self.fwk.component_id, 'wait_call', call_id, block) response = self._get_service_response(msg_id, block=True) - formatted_args = ['%.3f' % (x) if isinstance(x, float) - else str(x) for x in args] - self._send_monitor_event('IPS_CALL_END', 'Target = ' + - target + ':' + method_name + '(' + - str(*formatted_args) + ')', - elapsed_time=time.time()-start_time) + formatted_args = ','.join('%.3f' % (x) if isinstance(x, float) + else str(x) for x in args) + target_full = f'{target}:{method_name}({formatted_args})' + self._send_monitor_event('IPS_CALL_END', 'Target = ' + target_full, + start_time=start_time, + end_time=time.time(), + elapsed_time=time.time()-start_time, + target=target_full) del self.call_targets[call_id] return response @@ -643,7 +653,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): except Exception: raise - task_id = self._launch_task(nproc, working_dir, task_id, command, env_update, tag, keywords) + task_id = self._launch_task(nproc, working_dir, task_id, command, cores_allocated, env_update, tag, keywords, binary, args) if env_update: self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}, env = {env_update}', @@ -654,7 +664,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): return task_id - def _launch_task(self, nproc, working_dir, task_id, command, env_update, tag, keywords): + def _launch_task(self, nproc, working_dir, task_id, command, cores_allocated, env_update, tag, keywords, binary, args): log_filename = keywords.get('logfile') timeout = keywords.get("timeout", 1.e9) @@ -700,7 +710,7 @@ def _launch_task(self, nproc, working_dir, task_id, command, env_update, tag, ke # FIXME: process Monitoring Command : ps --no-headers -o pid,state pid1 pid2 pid3 ... - self.task_map[task_id] = (process, time.time(), timeout) + self.task_map[task_id] = (process, time.time(), timeout, nproc, cores_allocated, command, binary, args) return task_id # process.pid def launch_task_pool(self, task_pool_name, launch_interval=0.0): @@ -750,7 +760,9 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0): (task_id, command, env_update, cores_allocated) = allocated_tasks[task_name] tag = task.keywords.get('tag', 'None') - active_tasks[task_name] = self._launch_task(task.nproc, task.working_dir, task_id, command, env_update, tag, task.keywords) + active_tasks[task_name] = self._launch_task(task.nproc, + task.working_dir, task_id, command, cores_allocated, + env_update, tag, task.keywords, task.binary, task.args) if env_update: self._send_monitor_event('IPS_LAUNCH_TASK_POOL', @@ -778,7 +790,7 @@ def kill_task(self, task_id): :rtype: bool """ try: - process, _, _ = self.task_map[task_id] + process, *_ = self.task_map[task_id] # TODO: process and start_time will have to be accessed as shown # below if this task can be relaunched to support FT... except KeyError: @@ -824,7 +836,7 @@ def wait_task_nonblocking(self, task_id): :return: return value of task if finished else None """ try: - process, start_time, timeout = self.task_map[task_id] + process, start_time, timeout, *_ = self.task_map[task_id] # TODO: process and start_time will have to be accessed as shown # below if this task can be relaunched to support FT... except KeyError: @@ -861,7 +873,7 @@ def wait_task(self, task_id, timeout=-1, delay=1): :return: return value of task """ try: - process, start_time, _ = self.task_map[task_id] + process, start_time, _, nproc, cores, _, binary, args = self.task_map[task_id] except KeyError: self.exception('Error: unrecognizable task_id = %s ', str(task_id)) raise @@ -886,7 +898,12 @@ def wait_task(self, task_id, timeout=-1, delay=1): else: self._send_monitor_event('IPS_TASK_END', 'task_id = %s elapsed time = %.2f S' % (str(task_id), finish_time - start_time), - elapsed_time=finish_time - start_time) + start_time=start_time, + end_time=finish_time, + elapsed_time=finish_time - start_time, + procs_requested=nproc, + cores_allocated=cores, + target=f'{binary}({",".join(args)})') del self.task_map[task_id] try: diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index c77fc228..c73c8d41 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -404,8 +404,8 @@ def test_init_task_srun(tmpdir): def init_final_task(nproc, tppn, tcpt=0): task_id, cmd, _, cores_allocated = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', - nproc, 'exe', '/dir', tppn, True, - True, True, tcpt)) + nproc, 'exe', '/dir', tppn, True, + True, True, tcpt)) tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', task_id, None)) return task_id, cmd, cores_allocated From 00b1838bb6e2b6258fe9618c485b022026238f8d Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 3 Feb 2022 17:23:38 -0500 Subject: [PATCH 04/17] Include origin_target --- ipsframework/ips.py | 2 ++ ipsframework/services.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/ipsframework/ips.py b/ipsframework/ips.py index 8587c754..79086ff6 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -633,6 +633,8 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True'): portal_data['state'] = 'Completed' portal_data['stopat'] = time.strftime('%Y-%m-%d|%H:%M:%S%Z', time.localtime()) + portal_data['start_time'] = self.start_time + portal_data['end_time'] = time.time() event_body = {} event_body['sim_name'] = sim_name diff --git a/ipsframework/services.py b/ipsframework/services.py index 1fc9ab93..c85401d7 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -415,6 +415,7 @@ def _send_monitor_event(self, portal_data['end_time'] = end_time if target is not None: portal_data['target'] = target + portal_data['origin_target'] = f'{self.component_ref.component_id.get_class_name()}@{self.component_ref.component_id.get_seq_num()}' if procs_requested is not None: portal_data['procs_requested'] = procs_requested if cores_allocated is not None: @@ -529,7 +530,7 @@ def wait_call(self, call_id, block=True): start_time=start_time, end_time=time.time(), elapsed_time=time.time()-start_time, - target=target_full) + target=target) del self.call_targets[call_id] return response From 29c8d9086b614feb62bf5055d1e7ab494b1cfc4c Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 4 Feb 2022 11:16:33 -0500 Subject: [PATCH 05/17] Add reference to origin_target --- ipsframework/component.py | 12 ++++++------ ipsframework/services.py | 8 +++++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/ipsframework/component.py b/ipsframework/component.py index 33baead0..5f130d16 100644 --- a/ipsframework/component.py +++ b/ipsframework/component.py @@ -121,19 +121,19 @@ def __run__(self): self.services.log('Received Message ') sender_id = msg.sender_id call_id = msg.call_id - method_name = msg.target_method - args = msg.args + self.method_name = msg.target_method + self.args = msg.args keywords = msg.keywords formatted_args = ['%.3f' % (x) if isinstance(x, float) - else str(x) for x in args] + else str(x) for x in self.args] if keywords: formatted_args += [" %s=" % k + str(v) for (k, v) in keywords.items()] - self.services.debug('Calling method ' + method_name + + self.services.debug('Calling method ' + self.method_name + "(" + ' ,'.join(formatted_args) + ")") try: - method = getattr(self, method_name) - retval = method(*args, **keywords) + method = getattr(self, self.method_name) + retval = method(*self.args, **keywords) except Exception as e: self.services.exception('Uncaught Exception in component method.') response_msg = MethodResultMessage(self.component_id, diff --git a/ipsframework/services.py b/ipsframework/services.py index c85401d7..3fc444d6 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -415,7 +415,9 @@ def _send_monitor_event(self, portal_data['end_time'] = end_time if target is not None: portal_data['target'] = target - portal_data['origin_target'] = f'{self.component_ref.component_id.get_class_name()}@{self.component_ref.component_id.get_seq_num()}' + formatted_args = ['%.3f' % (x) if isinstance(x, float) + else str(x) for x in self.component_ref.args] + portal_data['origin_target'] = f"{self.component_ref.component_id.get_class_name()}@{self.component_ref.component_id.get_seq_num()}:{self.component_ref.method_name}({' ,'.join(formatted_args)})" if procs_requested is not None: portal_data['procs_requested'] = procs_requested if cores_allocated is not None: @@ -530,7 +532,7 @@ def wait_call(self, call_id, block=True): start_time=start_time, end_time=time.time(), elapsed_time=time.time()-start_time, - target=target) + target=target_full) del self.call_targets[call_id] return response @@ -904,7 +906,7 @@ def wait_task(self, task_id, timeout=-1, delay=1): elapsed_time=finish_time - start_time, procs_requested=nproc, cores_allocated=cores, - target=f'{binary}({",".join(args)})') + target=f'{binary} {" ".join(args)}') del self.task_map[task_id] try: From eb6393d7cfcf45179cdcc3fafe49ef5fea85a84b Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 4 Feb 2022 13:19:17 -0500 Subject: [PATCH 06/17] Separate target and operation for tasks --- ipsframework/services.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 3fc444d6..60faa933 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -391,6 +391,7 @@ def _send_monitor_event(self, start_time=None, end_time=None, target=None, + operation=None, procs_requested=None, cores_allocated=None): """ @@ -415,9 +416,11 @@ def _send_monitor_event(self, portal_data['end_time'] = end_time if target is not None: portal_data['target'] = target + portal_data['operation'] = operation formatted_args = ['%.3f' % (x) if isinstance(x, float) else str(x) for x in self.component_ref.args] - portal_data['origin_target'] = f"{self.component_ref.component_id.get_class_name()}@{self.component_ref.component_id.get_seq_num()}:{self.component_ref.method_name}({' ,'.join(formatted_args)})" + portal_data['sender_operation'] = f"{self.component_ref.method_name}({' ,'.join(formatted_args)})" + portal_data['sender'] = f"{self.component_ref.component_id.get_class_name()}@{self.component_ref.component_id.get_seq_num()}" if procs_requested is not None: portal_data['procs_requested'] = procs_requested if cores_allocated is not None: @@ -532,7 +535,8 @@ def wait_call(self, call_id, block=True): start_time=start_time, end_time=time.time(), elapsed_time=time.time()-start_time, - target=target_full) + target=target, + operation=f'{method_name}({formatted_args})') del self.call_targets[call_id] return response @@ -906,7 +910,8 @@ def wait_task(self, task_id, timeout=-1, delay=1): elapsed_time=finish_time - start_time, procs_requested=nproc, cores_allocated=cores, - target=f'{binary} {" ".join(args)}') + target=binary, + operation=" ".join(args)) del self.task_map[task_id] try: From 78cc1c1484fcea3cab0aa972d208e74b8c05775a Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 10 Feb 2022 14:35:10 -0500 Subject: [PATCH 07/17] Move trace data to trace entry --- ipsframework/ips.py | 8 ++++++++ ipsframework/portalBridge.py | 3 +++ ipsframework/services.py | 29 +++++++++++++++++++---------- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/ipsframework/ips.py b/ipsframework/ips.py index 79086ff6..90818a15 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -59,6 +59,7 @@ import logging import os import time +import hashlib from ipsframework import platformspec from ipsframework.messages import Message, ServiceRequestMessage, \ ServiceResponseMessage, MethodInvokeMessage @@ -635,6 +636,13 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True'): time.localtime()) portal_data['start_time'] = self.start_time portal_data['end_time'] = time.time() + portal_data['trace'] = {"timestamp": int(time.time()*1e6), + "duration": int((time.time() - self.start_time)*1e6), + "localEndpoint": { + "serviceName": str(self.component_id) + }, + "id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16], + "id_text": str(self.component_id)} event_body = {} event_body['sim_name'] = sim_name diff --git a/ipsframework/portalBridge.py b/ipsframework/portalBridge.py index 1225ada8..00d74175 100644 --- a/ipsframework/portalBridge.py +++ b/ipsframework/portalBridge.py @@ -199,6 +199,9 @@ def process_event(self, topicName, theEvent): portal_data['portal_runid'] = sim_data.portal_runid portal_data['seqnum'] = sim_data.counter + if 'trace' in portal_data: + portal_data['trace']['traceID'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() + self.send_event(sim_data, portal_data) sim_data.counter += 1 self.counter += 1 diff --git a/ipsframework/services.py b/ipsframework/services.py index 60faa933..f3dfd24d 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -7,7 +7,7 @@ import os import subprocess import threading - +import hashlib import time import shutil import logging @@ -408,19 +408,28 @@ def _send_monitor_event(self, if event_time is None: event_time = time.time() portal_data['walltime'] = '%.2f' % (event_time - self.component_ref.start_time) - if elapsed_time is not None: - portal_data['elapsed_time'] = elapsed_time + + trace = {} if start_time is not None: - portal_data['start_time'] = start_time - if end_time is not None: - portal_data['end_time'] = end_time + trace['timestamp'] = int(start_time*1e6) # convert to microsecond + if elapsed_time is not None: + trace['duration'] = end_time + elif end_time is not None: + trace['duration'] = int((end_time-start_time)*1e6) # convert to microsecond if target is not None: - portal_data['target'] = target - portal_data['operation'] = operation + trace['localEndpoint'] = {"serviceName": target} + trace['name'] = operation formatted_args = ['%.3f' % (x) if isinstance(x, float) else str(x) for x in self.component_ref.args] - portal_data['sender_operation'] = f"{self.component_ref.method_name}({' ,'.join(formatted_args)})" - portal_data['sender'] = f"{self.component_ref.component_id.get_class_name()}@{self.component_ref.component_id.get_seq_num()}" + trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16] + trace['id_text'] = f"{target}:{operation}" + trace['parentId'] = hashlib.md5(f"{self.component_ref.component_id.get_class_name()}@{self.component_ref.component_id.get_seq_num()}:{self.component_ref.method_name}({' ,'.join(formatted_args)})".encode()).hexdigest()[:16] + trace['parentId_text'] = f"{self.component_ref.component_id.get_class_name()}@{self.component_ref.component_id.get_seq_num()}:{self.component_ref.method_name}({' ,'.join(formatted_args)})" + trace['componentID'] = str(self.component_ref.component_id) + + if trace: + portal_data['trace'] = trace + if procs_requested is not None: portal_data['procs_requested'] = procs_requested if cores_allocated is not None: From a7ed6752d6197e2b201f2b940b324ff0ff642add Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 10 Feb 2022 16:48:10 -0500 Subject: [PATCH 08/17] Capture Framework calls to driver as events --- ipsframework/component.py | 2 ++ ipsframework/ips.py | 51 ++++++++++++++++++++++++++------------- ipsframework/services.py | 12 +++------ 3 files changed, 40 insertions(+), 25 deletions(-) diff --git a/ipsframework/component.py b/ipsframework/component.py index 5f130d16..b6c90762 100644 --- a/ipsframework/component.py +++ b/ipsframework/component.py @@ -31,6 +31,8 @@ def __init__(self, services, config): self.config = config self.start_time = 0.0 self.sys_exit = None + self.method_name = None + self.args = None for i in config.keys(): try: setattr(self, i, config[i]) diff --git a/ipsframework/ips.py b/ipsframework/ips.py index 90818a15..7351d993 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -182,7 +182,7 @@ def __init__(self, config_file_list, log_file_name, platform_file_name=None, logger.addHandler(self.ch) self.logger = logger self.verbose_debug = verbose_debug - self.outstanding_calls_list = [] + self.outstanding_calls_list = {} self.call_queue_map = {} # add the handler to the root logger @@ -422,7 +422,7 @@ def run(self): for method in ['step', 'finalize']: req_msg = ServiceRequestMessage(self.component_id, self.component_id, comp_id, 'init_call', method, 0) - msg_list.append(req_msg) + msg_list.append((req_msg, None, str(comp_id), method, 0)) outstanding_sim_calls[str(comp_id)] = msg_list @@ -448,7 +448,7 @@ def run(self): self.component_id, comp_id, 'init_call', method, 0) - msg_list.append(req_msg) + msg_list.append((req_msg, sim_name, str(comp_id), method, 0)) # SIMYAN: add the msg_list to the outstanding sim calls if msg_list: outstanding_sim_calls[sim_name] = msg_list @@ -461,11 +461,11 @@ def run(self): # send off first round of invocations... try: for sim_name, msg_list in outstanding_sim_calls.items(): - msg = msg_list.pop(0) + msg, sim_name, comp, method, arg = msg_list.pop(0) self.debug('Framework sending message %s ', msg.__dict__) call_id = self.task_manager.init_call(msg, manage_return=False) self.call_queue_map[call_id] = msg_list - self.outstanding_calls_list.append(call_id) + self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time() except Exception: self.exception('encountered exception during fwk.run() sending first round of invocations (init of inits and fwk comps)') self.terminate_all_sims(status=Message.FAILURE) @@ -504,7 +504,14 @@ def run(self): self.task_manager.return_call(msg) continue # Message is a result from a framework invocation - self.outstanding_calls_list.remove(msg.call_id) + sim_name, comp, method, arg, start_time = self.outstanding_calls_list.pop(msg.call_id) + if sim_name is not None: + self._send_monitor_event(sim_name=sim_name, + eventType='IPS_CALL_END', + start_time=start_time, + end_time=time.time(), + target=comp, + operation=f'{method}({arg})') sim_msg_list = self.call_queue_map[msg.call_id] del self.call_queue_map[msg.call_id] if msg.status == Message.FAILURE: @@ -521,10 +528,10 @@ def run(self): comment = 'Simulation Ended' ok = True try: - next_call_msg = sim_msg_list.pop(0) + next_call_msg, sim_name, comp, method, arg = sim_msg_list.pop(0) call_id = self.task_manager.init_call(next_call_msg, manage_return=False) - self.outstanding_calls_list.append(call_id) + self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time() self.call_queue_map[call_id] = sim_msg_list except IndexError: sim_comps = self.config_manager.get_component_map() # Get any new dynamic simulations @@ -555,16 +562,16 @@ def initiate_new_simulation(self, sim_name): req_msg = ServiceRequestMessage(self.component_id, self.component_id, comp_id, 'init_call', method, 0) - msg_list.append(req_msg) + msg_list.append((req_msg, sim_name, str(comp_id), method, 0)) # send off first round of invocations... - msg = msg_list.pop(0) + msg, sim_name, comp, method, arg = msg_list.pop(0) self.debug('Framework sending message %s ', msg.__dict__) call_id = self.task_manager.init_call(msg, manage_return=False) self.call_queue_map[call_id] = msg_list - self.outstanding_calls_list.append(call_id) + self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time() - def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True'): + def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True', target=None, operation=None, start_time=None, end_time=None): """ Publish a portal monitor event to the *_IPS_MONITOR* event topic. Event topics that start with an underscore are reserved for use by the @@ -634,15 +641,25 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True'): portal_data['state'] = 'Completed' portal_data['stopat'] = time.strftime('%Y-%m-%d|%H:%M:%S%Z', time.localtime()) - portal_data['start_time'] = self.start_time - portal_data['end_time'] = time.time() - portal_data['trace'] = {"timestamp": int(time.time()*1e6), + portal_data['trace'] = {"timestamp": int(self.start_time*1e6), "duration": int((time.time() - self.start_time)*1e6), "localEndpoint": { "serviceName": str(self.component_id) }, - "id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16], - "id_text": str(self.component_id)} + "id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16]} + elif eventType == "IPS_CALL_END": + trace = {} + if start_time is not None and end_time is not None: + trace['timestamp'] = int(start_time*1e6) # convert to microsecond + trace['duration'] = int((end_time-start_time)*1e6) # convert to microsecond + if target is not None: + trace['localEndpoint'] = {"serviceName": target} + trace['name'] = operation + trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16] + trace["parentId"] = hashlib.md5(str(self.component_id).encode()).hexdigest()[:16] + + if trace: + portal_data['trace'] = trace event_body = {} event_body['sim_name'] = sim_name diff --git a/ipsframework/services.py b/ipsframework/services.py index f3dfd24d..a932201d 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -413,7 +413,7 @@ def _send_monitor_event(self, if start_time is not None: trace['timestamp'] = int(start_time*1e6) # convert to microsecond if elapsed_time is not None: - trace['duration'] = end_time + trace['duration'] = int(elapsed_time*1e6) elif end_time is not None: trace['duration'] = int((end_time-start_time)*1e6) # convert to microsecond if target is not None: @@ -422,10 +422,8 @@ def _send_monitor_event(self, formatted_args = ['%.3f' % (x) if isinstance(x, float) else str(x) for x in self.component_ref.args] trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16] - trace['id_text'] = f"{target}:{operation}" - trace['parentId'] = hashlib.md5(f"{self.component_ref.component_id.get_class_name()}@{self.component_ref.component_id.get_seq_num()}:{self.component_ref.method_name}({' ,'.join(formatted_args)})".encode()).hexdigest()[:16] - trace['parentId_text'] = f"{self.component_ref.component_id.get_class_name()}@{self.component_ref.component_id.get_seq_num()}:{self.component_ref.method_name}({' ,'.join(formatted_args)})" - trace['componentID'] = str(self.component_ref.component_id) + trace['parentId'] = hashlib.md5(f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)})" + .encode()).hexdigest()[:16] if trace: portal_data['trace'] = trace @@ -482,9 +480,7 @@ def call_nonblocking(self, component_id, method_name, *args, **keywords): :return: call_id :rtype: int """ - target_class = component_id.get_class_name() - target_seqnum = component_id.get_seq_num() - target = target_class + '@' + str(target_seqnum) + target = str(component_id) formatted_args = ['%.3f' % (x) if isinstance(x, float) else str(x) for x in args] if keywords: From 86c195421398d662014c32f787e2ef686ad3bcfe Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 11 Feb 2022 09:09:11 -0500 Subject: [PATCH 09/17] Quick fix tests --- tests/new/test_dask.py | 8 ++++---- tests/new/test_ips_framework.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/new/test_dask.py b/tests/new/test_dask.py index f7674815..4dc016bb 100644 --- a/tests/new/test_dask.py +++ b/tests/new/test_dask.py @@ -107,7 +107,7 @@ def test_dask(tmpdir): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 22 + assert len(lines) == 25 eventtypes = [e.get('eventtype') for e in lines] assert eventtypes.count('IPS_LAUNCH_DASK_TASK') == 4 @@ -154,7 +154,7 @@ def test_dask_shifter_fail(tmpdir): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 8 + assert len(lines) == 10 assert lines[-1].get('eventtype') == "IPS_END" assert lines[-1].get('comment') == "Simulation Execution Error" @@ -210,7 +210,7 @@ def test_dask_fake_shifter(tmpdir, monkeypatch): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 22 + assert len(lines) == 25 eventtypes = [e.get('eventtype') for e in lines] assert eventtypes.count('IPS_LAUNCH_DASK_TASK') == 4 @@ -270,7 +270,7 @@ def test_dask_timeout(tmpdir): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 22 + assert len(lines) == 25 eventtypes = [e.get('eventtype') for e in lines] assert eventtypes.count('IPS_LAUNCH_DASK_TASK') == 4 diff --git a/tests/new/test_ips_framework.py b/tests/new/test_ips_framework.py index 93bd044d..687c6af3 100644 --- a/tests/new/test_ips_framework.py +++ b/tests/new/test_ips_framework.py @@ -125,11 +125,11 @@ def test_framework_simple(tmpdir, capfd): with open(json_files[0], 'r') as json_file: json_lines = json_file.readlines() - assert len(json_lines) == 3 + assert len(json_lines) == 6 event0 = json.loads(json_lines[0]) event1 = json.loads(json_lines[1]) - event2 = json.loads(json_lines[2]) + event2 = json.loads(json_lines[5]) assert event0['eventtype'] == 'IPS_START' assert event1['eventtype'] == 'IPS_RESOURCE_ALLOC' From eb4e4d26eb96997673482675ad38cb69c7ab9b4e Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 11 Feb 2022 10:59:29 -0500 Subject: [PATCH 10/17] Add trace for stage in/out/state file --- ipsframework/services.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index a932201d..fdbe2c2f 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -1325,7 +1325,10 @@ def stage_input_files(self, input_file_list): comment='Elapsed time = %.3f Path = %s Files = %s' % (elapsed_time, os.path.abspath(inputDir), str(input_file_list)), - elapsed_time=elapsed_time) + start_time=start_time, + elapsed_time=elapsed_time, + target="stage_input_files", + operation=str(input_file_list)) def stage_subflow_output_files(self, subflow_name='ALL'): """Gather outputs from sub-workflows. Sub-workflow output is defined @@ -1512,7 +1515,10 @@ def stage_output_files(self, timeStamp, file_list, keep_old_files=True, save_pla self._send_monitor_event('IPS_STAGE_OUTPUTS', 'Elapsed time = %.3f Path = %s Files = %s' % (elapsed_time, output_dir, str(file_list)), - elapsed_time=elapsed_time) + start_time=start_time, + elapsed_time=elapsed_time, + target="stage_output_files", + operation=str(file_list)) def save_restart_files(self, timeStamp, file_list): """ @@ -1620,7 +1626,10 @@ def stage_state(self, state_files=None): self._send_monitor_event('IPS_STAGE_STATE', 'Elapsed time = %.3f files = %s Success' % (elapsed_time, ' '.join(files)), - elapsed_time=elapsed_time) + start_time=start_time, + elapsed_time=elapsed_time, + target="stage_state", + operation=str(files)) def update_state(self, state_files=None): """ @@ -1656,7 +1665,10 @@ def update_state(self, state_files=None): self._send_monitor_event('IPS_UPDATE_STATE', 'Elapsed time = %.3f files = %s Success' % (elapsed_time, ' '.join(files)), - elapsed_time=elapsed_time) + start_time=start_time, + elapsed_time=elapsed_time, + target="update_state", + operation=str(files)) def merge_current_state(self, partial_state_file, logfile=None, merge_binary=None): """ From 6e233db681eed938d13151a005564fb1e43fa1ed Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 11 Feb 2022 11:35:07 -0500 Subject: [PATCH 11/17] Get working for dask tasks --- ipsframework/services.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index fdbe2c2f..16f5f2b7 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -77,8 +77,8 @@ def launch(binary, task_name, working_dir, *args, **keywords): cmd = f"{binary} {' '.join(map(str, args))}" with worker.lock: - print(json.dumps({"event_type": "IPS_LAUNCH_DASK_TASK", "event_time": time.time(), - "event_comment": f"task_name = {task_name}, Target = {cmd}"}), + print(json.dumps({"eventType": "IPS_LAUNCH_DASK_TASK", "event_time": time.time(), + "comment": f"task_name = {task_name}, Target = {cmd}"}), file=worker_event_log) cmd_lst = cmd.split() @@ -91,28 +91,34 @@ def launch(binary, task_name, working_dir, *args, **keywords): ret_val = process.wait(timeout) finish_time = time.time() with worker.lock: - print(json.dumps({"event_type": "IPS_TASK_END", "event_time": finish_time, - "event_comment": f"task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s", - "elapsed_time": finish_time - start_time}), + print(json.dumps({"eventType": "IPS_TASK_END", "event_time": finish_time, + "comment": f"task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s", + "start_time": start_time, + "elapsed_time": finish_time - start_time, + "target": binary, + "operation": ' '.join(map(str, args))}), file=worker_event_log) except subprocess.TimeoutExpired: with worker.lock: - print(json.dumps({"event_type": "IPS_TASK_END", "event_time": time.time(), - "event_comment": f"task_name = {task_name}, timed-out after {timeout}s"}), + print(json.dumps({"eventType": "IPS_TASK_END", "event_time": time.time(), + "comment": f"task_name = {task_name}, timed-out after {timeout}s"}), file=worker_event_log) os.killpg(process.pid, signal.SIGKILL) ret_val = -1 else: with worker.lock: - print(json.dumps({"event_type": "IPS_LAUNCH_DASK_TASK", "event_time": time.time(), - "event_comment": f"task_name = {task_name}, Target = {binary.__name__}({','.join(map(str, args))})"}), + print(json.dumps({"eventType": "IPS_LAUNCH_DASK_TASK", "event_time": time.time(), + "comment": f"task_name = {task_name}, Target = {binary.__name__}({','.join(map(str, args))})"}), file=worker_event_log) ret_val = binary(*args) finish_time = time.time() with worker.lock: - print(json.dumps({"event_type": "IPS_TASK_END", "event_time": finish_time, - "event_comment": f"task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s", - "elapsed_time": finish_time - start_time}), + print(json.dumps({"eventType": "IPS_TASK_END", "event_time": finish_time, + "comment": f"task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s", + "start_time": start_time, + "elapsed_time": finish_time - start_time, + "target": binary.__name__, + "operation": f"({','.join(map(str, args))})"}), file=worker_event_log) return task_name, ret_val @@ -2218,7 +2224,7 @@ def get_dask_finished_tasks_status(self): events.sort(key=itemgetter('event_time')) for event in events: - self.services.send_portal_event(**event) + self.services._send_monitor_event(**event) except Exception as e: # If it fails for any other reason, make sure we can continue self.services.exception('Error while reading dask worker log files: %s', str(e)) From e1e9d381f5f9296c1f4501d2faea29fb6e9b7e4c Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 11 Feb 2022 12:52:23 -0500 Subject: [PATCH 12/17] Add IPS_CALL_BEGIN events to Framework --- ipsframework/ips.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ipsframework/ips.py b/ipsframework/ips.py index 7351d993..e08280fd 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -463,6 +463,10 @@ def run(self): for sim_name, msg_list in outstanding_sim_calls.items(): msg, sim_name, comp, method, arg = msg_list.pop(0) self.debug('Framework sending message %s ', msg.__dict__) + if sim_name is not None: + self._send_monitor_event(sim_name=sim_name, + comment=f'Target = {comp}:{method}({arg})', + eventType='IPS_CALL_BEGIN') call_id = self.task_manager.init_call(msg, manage_return=False) self.call_queue_map[call_id] = msg_list self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time() @@ -507,6 +511,7 @@ def run(self): sim_name, comp, method, arg, start_time = self.outstanding_calls_list.pop(msg.call_id) if sim_name is not None: self._send_monitor_event(sim_name=sim_name, + comment=f'Target = {comp}:{method}({arg})', eventType='IPS_CALL_END', start_time=start_time, end_time=time.time(), @@ -529,6 +534,10 @@ def run(self): ok = True try: next_call_msg, sim_name, comp, method, arg = sim_msg_list.pop(0) + if sim_name is not None: + self._send_monitor_event(sim_name=sim_name, + comment=f'Target = {comp}:{method}({arg})', + eventType='IPS_CALL_BEGIN') call_id = self.task_manager.init_call(next_call_msg, manage_return=False) self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time() From f20e9e767d82df543aeeee2695cc2390beaf19b2 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 11 Feb 2022 13:10:28 -0500 Subject: [PATCH 13/17] Fix tests --- tests/new/test_dask.py | 8 ++++---- tests/new/test_ips_framework.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/new/test_dask.py b/tests/new/test_dask.py index 4dc016bb..76aadfc9 100644 --- a/tests/new/test_dask.py +++ b/tests/new/test_dask.py @@ -107,7 +107,7 @@ def test_dask(tmpdir): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 25 + assert len(lines) == 28 eventtypes = [e.get('eventtype') for e in lines] assert eventtypes.count('IPS_LAUNCH_DASK_TASK') == 4 @@ -154,7 +154,7 @@ def test_dask_shifter_fail(tmpdir): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 10 + assert len(lines) == 12 assert lines[-1].get('eventtype') == "IPS_END" assert lines[-1].get('comment') == "Simulation Execution Error" @@ -210,7 +210,7 @@ def test_dask_fake_shifter(tmpdir, monkeypatch): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 25 + assert len(lines) == 28 eventtypes = [e.get('eventtype') for e in lines] assert eventtypes.count('IPS_LAUNCH_DASK_TASK') == 4 @@ -270,7 +270,7 @@ def test_dask_timeout(tmpdir): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 25 + assert len(lines) == 28 eventtypes = [e.get('eventtype') for e in lines] assert eventtypes.count('IPS_LAUNCH_DASK_TASK') == 4 diff --git a/tests/new/test_ips_framework.py b/tests/new/test_ips_framework.py index 687c6af3..89869f00 100644 --- a/tests/new/test_ips_framework.py +++ b/tests/new/test_ips_framework.py @@ -125,11 +125,11 @@ def test_framework_simple(tmpdir, capfd): with open(json_files[0], 'r') as json_file: json_lines = json_file.readlines() - assert len(json_lines) == 6 + assert len(json_lines) == 9 event0 = json.loads(json_lines[0]) event1 = json.loads(json_lines[1]) - event2 = json.loads(json_lines[5]) + event2 = json.loads(json_lines[8]) assert event0['eventtype'] == 'IPS_START' assert event1['eventtype'] == 'IPS_RESOURCE_ALLOC' From 68f1343e99ec2330d324b6eada1f449d89e03b42 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 11 Feb 2022 16:19:11 -0500 Subject: [PATCH 14/17] traceID -> traceId --- ipsframework/portalBridge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipsframework/portalBridge.py b/ipsframework/portalBridge.py index 00d74175..15ed3a34 100644 --- a/ipsframework/portalBridge.py +++ b/ipsframework/portalBridge.py @@ -200,7 +200,7 @@ def process_event(self, topicName, theEvent): portal_data['seqnum'] = sim_data.counter if 'trace' in portal_data: - portal_data['trace']['traceID'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() + portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() self.send_event(sim_data, portal_data) sim_data.counter += 1 From b3ceabf5eaf1e0c45ebe8779312ee8a8ebfcfbf4 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 25 Feb 2022 08:24:48 -0500 Subject: [PATCH 15/17] Move procs_requested/cores_allocated to trace tags --- ipsframework/services.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 16f5f2b7..009607c3 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -416,13 +416,12 @@ def _send_monitor_event(self, portal_data['walltime'] = '%.2f' % (event_time - self.component_ref.start_time) trace = {} - if start_time is not None: + if start_time is not None and (elapsed_time is not None or end_time is not None) and target is not None and operation is not None: trace['timestamp'] = int(start_time*1e6) # convert to microsecond if elapsed_time is not None: trace['duration'] = int(elapsed_time*1e6) elif end_time is not None: trace['duration'] = int((end_time-start_time)*1e6) # convert to microsecond - if target is not None: trace['localEndpoint'] = {"serviceName": target} trace['name'] = operation formatted_args = ['%.3f' % (x) if isinstance(x, float) @@ -430,14 +429,15 @@ def _send_monitor_event(self, trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16] trace['parentId'] = hashlib.md5(f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)})" .encode()).hexdigest()[:16] + trace['tags'] = {} + if procs_requested is not None: + trace['tags']['procs_requested'] = str(procs_requested) + if cores_allocated is not None: + trace['tags']['cores_allocated'] = str(cores_allocated) if trace: portal_data['trace'] = trace - if procs_requested is not None: - portal_data['procs_requested'] = procs_requested - if cores_allocated is not None: - portal_data['cores_allocated'] = cores_allocated portal_data['state'] = state portal_data['comment'] = comment if self.monitor_url: From 3776d0df03bedb302b781412786740901d703b37 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 3 Mar 2022 16:13:15 -0500 Subject: [PATCH 16/17] Capture total_cores in trace --- ipsframework/ips.py | 3 ++- tests/helloworld/test_helloworld.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ipsframework/ips.py b/ipsframework/ips.py index e08280fd..cf7a282b 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -655,7 +655,8 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True', "localEndpoint": { "serviceName": str(self.component_id) }, - "id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16]} + "id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16], + 'tags': {'total_cores': str(self.resource_manager.total_cores)}} elif eventType == "IPS_CALL_END": trace = {} if start_time is not None and end_time is not None: diff --git a/tests/helloworld/test_helloworld.py b/tests/helloworld/test_helloworld.py index b91af60b..de24b71a 100644 --- a/tests/helloworld/test_helloworld.py +++ b/tests/helloworld/test_helloworld.py @@ -259,6 +259,7 @@ def test_helloworld_task_pool_dask(tmpdir, capfd): @pytest.mark.skipif(sys.platform == 'darwin', reason="This doesn't work with macOS") +@pytest.mark.timeout(120) def test_helloworld_portal(tmpdir, capfd): data_dir = os.path.dirname(__file__) copy_config_and_replace(os.path.join(data_dir, "hello_world.ips"), tmpdir.join("hello_world.ips"), tmpdir, portal=True) From 1d96683f90d1b36eae65b0f8b7715a4ce3848c40 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 4 Mar 2022 09:36:08 -0500 Subject: [PATCH 17/17] Add tests for traces --- ipsframework/ips.py | 3 ++- ipsframework/services.py | 2 +- tests/helloworld/test_helloworld.py | 27 +++++++++++++++++++++++++-- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/ipsframework/ips.py b/ipsframework/ips.py index cf7a282b..dc197c78 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -650,6 +650,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True', portal_data['state'] = 'Completed' portal_data['stopat'] = time.strftime('%Y-%m-%d|%H:%M:%S%Z', time.localtime()) + # Zipkin json format portal_data['trace'] = {"timestamp": int(self.start_time*1e6), "duration": int((time.time() - self.start_time)*1e6), "localEndpoint": { @@ -658,7 +659,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True', "id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16], 'tags': {'total_cores': str(self.resource_manager.total_cores)}} elif eventType == "IPS_CALL_END": - trace = {} + trace = {} # Zipkin json format if start_time is not None and end_time is not None: trace['timestamp'] = int(start_time*1e6) # convert to microsecond trace['duration'] = int((end_time-start_time)*1e6) # convert to microsecond diff --git a/ipsframework/services.py b/ipsframework/services.py index 009607c3..9d5241a8 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -415,7 +415,7 @@ def _send_monitor_event(self, event_time = time.time() portal_data['walltime'] = '%.2f' % (event_time - self.component_ref.start_time) - trace = {} + trace = {} # Zipkin json format if start_time is not None and (elapsed_time is not None or end_time is not None) and target is not None and operation is not None: trace['timestamp'] = int(start_time*1e6) # convert to microsecond if elapsed_time is not None: diff --git a/tests/helloworld/test_helloworld.py b/tests/helloworld/test_helloworld.py index de24b71a..09113b3b 100644 --- a/tests/helloworld/test_helloworld.py +++ b/tests/helloworld/test_helloworld.py @@ -4,6 +4,7 @@ import json import glob import socketserver +import hashlib import pytest from ipsframework import Framework, TaskPool @@ -307,7 +308,8 @@ def handle(self): framework.run() - for _ in range(8): + # just get the first 5 events + for _ in range(5): server.handle_request() captured = capfd.readouterr() @@ -340,7 +342,7 @@ def handle(self): assert '.eventlog' in exts # check data sent to portal - assert len(data) >= 6 + assert len(data) == 5 # get first event to check event = json.loads(data[0].split('\r\n')[-1]) assert event['code'] == 'Framework' @@ -349,3 +351,24 @@ def handle(self): assert event['state'] == 'Running' assert event['sim_name'] == 'Hello_world_1' assert event['seqnum'] == 0 + assert 'ips_version' in event + + # get last event to check + event = json.loads(data[-1].split('\r\n')[-1]) + assert event['code'] == 'DRIVERS_HELLO_HelloDriver' + assert event['eventtype'] == 'IPS_CALL_END' + assert event['comment'] == 'Target = Hello_world_1@HelloWorker@2:init(0.000)' + assert event['state'] == 'Running' + assert event['sim_name'] == 'Hello_world_1' + assert 'trace' in event + trace = event['trace'] + assert 'duration' in trace + assert 'timestamp' in trace + assert 'id' in trace + assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000)'.encode()).hexdigest()[:16] + assert 'traceId' in trace + assert trace['traceId'] == hashlib.md5(event['portal_runid'].encode()).hexdigest() + assert 'parentId' in trace + assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0)'.encode()).hexdigest()[:16] + assert 'localEndpoint' in trace + assert trace['localEndpoint']['serviceName'] == 'Hello_world_1@HelloWorker@2'