diff --git a/doc/examples/dask/simulation_log.json b/doc/examples/dask/simulation_log.json index edb9c841..1d7b88fd 100644 --- a/doc/examples/dask/simulation_log.json +++ b/doc/examples/dask/simulation_log.json @@ -21,18 +21,18 @@ "code": "DASK_WORKER__DaskWorker", "eventtype": "IPS_TASK_END", "walltime": "2.83", - "comment": "task_name = method, elasped time = 0.50s", + "comment": "task_name = method, elapsed time = 0.50s", } { "code": "DASK_WORKER__DaskWorker", "eventtype": "IPS_TASK_END", "walltime": "2.83", - "comment": "task_name = function, elasped time = 0.50s", + "comment": "task_name = function, elapsed time = 0.50s", } { "code": "DASK_WORKER__DaskWorker", "eventtype": "IPS_TASK_END", "walltime": "2.85", "state": "Running", - "comment": "task_name = binary, elasped time = 0.52s", + "comment": "task_name = binary, elapsed time = 0.52s", } diff --git a/ipsframework/component.py b/ipsframework/component.py index 33baead0..b6c90762 100644 --- a/ipsframework/component.py +++ b/ipsframework/component.py @@ -31,6 +31,8 @@ def __init__(self, services, config): self.config = config self.start_time = 0.0 self.sys_exit = None + self.method_name = None + self.args = None for i in config.keys(): try: setattr(self, i, config[i]) @@ -121,19 +123,19 @@ def __run__(self): self.services.log('Received Message ') sender_id = msg.sender_id call_id = msg.call_id - method_name = msg.target_method - args = msg.args + self.method_name = msg.target_method + self.args = msg.args keywords = msg.keywords formatted_args = ['%.3f' % (x) if isinstance(x, float) - else str(x) for x in args] + else str(x) for x in self.args] if keywords: formatted_args += [" %s=" % k + str(v) for (k, v) in keywords.items()] - self.services.debug('Calling method ' + method_name + + self.services.debug('Calling method ' + self.method_name + "(" + ' ,'.join(formatted_args) + ")") try: - method = getattr(self, method_name) - retval = method(*args, **keywords) + method = getattr(self, self.method_name) + retval = method(*self.args, **keywords) except Exception as e: self.services.exception('Uncaught Exception in component method.') response_msg = MethodResultMessage(self.component_id, diff --git a/ipsframework/ips.py b/ipsframework/ips.py index 8587c754..dc197c78 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -59,6 +59,7 @@ import logging import os import time +import hashlib from ipsframework import platformspec from ipsframework.messages import Message, ServiceRequestMessage, \ ServiceResponseMessage, MethodInvokeMessage @@ -181,7 +182,7 @@ def __init__(self, config_file_list, log_file_name, platform_file_name=None, logger.addHandler(self.ch) self.logger = logger self.verbose_debug = verbose_debug - self.outstanding_calls_list = [] + self.outstanding_calls_list = {} self.call_queue_map = {} # add the handler to the root logger @@ -421,7 +422,7 @@ def run(self): for method in ['step', 'finalize']: req_msg = ServiceRequestMessage(self.component_id, self.component_id, comp_id, 'init_call', method, 0) - msg_list.append(req_msg) + msg_list.append((req_msg, None, str(comp_id), method, 0)) outstanding_sim_calls[str(comp_id)] = msg_list @@ -447,7 +448,7 @@ def run(self): self.component_id, comp_id, 'init_call', method, 0) - msg_list.append(req_msg) + msg_list.append((req_msg, sim_name, str(comp_id), method, 0)) # SIMYAN: add the msg_list to the outstanding sim calls if msg_list: outstanding_sim_calls[sim_name] = msg_list @@ -460,11 +461,15 @@ def run(self): # send off first round of invocations... try: for sim_name, msg_list in outstanding_sim_calls.items(): - msg = msg_list.pop(0) + msg, sim_name, comp, method, arg = msg_list.pop(0) self.debug('Framework sending message %s ', msg.__dict__) + if sim_name is not None: + self._send_monitor_event(sim_name=sim_name, + comment=f'Target = {comp}:{method}({arg})', + eventType='IPS_CALL_BEGIN') call_id = self.task_manager.init_call(msg, manage_return=False) self.call_queue_map[call_id] = msg_list - self.outstanding_calls_list.append(call_id) + self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time() except Exception: self.exception('encountered exception during fwk.run() sending first round of invocations (init of inits and fwk comps)') self.terminate_all_sims(status=Message.FAILURE) @@ -503,7 +508,15 @@ def run(self): self.task_manager.return_call(msg) continue # Message is a result from a framework invocation - self.outstanding_calls_list.remove(msg.call_id) + sim_name, comp, method, arg, start_time = self.outstanding_calls_list.pop(msg.call_id) + if sim_name is not None: + self._send_monitor_event(sim_name=sim_name, + comment=f'Target = {comp}:{method}({arg})', + eventType='IPS_CALL_END', + start_time=start_time, + end_time=time.time(), + target=comp, + operation=f'{method}({arg})') sim_msg_list = self.call_queue_map[msg.call_id] del self.call_queue_map[msg.call_id] if msg.status == Message.FAILURE: @@ -520,10 +533,14 @@ def run(self): comment = 'Simulation Ended' ok = True try: - next_call_msg = sim_msg_list.pop(0) + next_call_msg, sim_name, comp, method, arg = sim_msg_list.pop(0) + if sim_name is not None: + self._send_monitor_event(sim_name=sim_name, + comment=f'Target = {comp}:{method}({arg})', + eventType='IPS_CALL_BEGIN') call_id = self.task_manager.init_call(next_call_msg, manage_return=False) - self.outstanding_calls_list.append(call_id) + self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time() self.call_queue_map[call_id] = sim_msg_list except IndexError: sim_comps = self.config_manager.get_component_map() # Get any new dynamic simulations @@ -554,16 +571,16 @@ def initiate_new_simulation(self, sim_name): req_msg = ServiceRequestMessage(self.component_id, self.component_id, comp_id, 'init_call', method, 0) - msg_list.append(req_msg) + msg_list.append((req_msg, sim_name, str(comp_id), method, 0)) # send off first round of invocations... - msg = msg_list.pop(0) + msg, sim_name, comp, method, arg = msg_list.pop(0) self.debug('Framework sending message %s ', msg.__dict__) call_id = self.task_manager.init_call(msg, manage_return=False) self.call_queue_map[call_id] = msg_list - self.outstanding_calls_list.append(call_id) + self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time() - def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True'): + def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True', target=None, operation=None, start_time=None, end_time=None): """ Publish a portal monitor event to the *_IPS_MONITOR* event topic. Event topics that start with an underscore are reserved for use by the @@ -633,6 +650,27 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True'): portal_data['state'] = 'Completed' portal_data['stopat'] = time.strftime('%Y-%m-%d|%H:%M:%S%Z', time.localtime()) + # Zipkin json format + portal_data['trace'] = {"timestamp": int(self.start_time*1e6), + "duration": int((time.time() - self.start_time)*1e6), + "localEndpoint": { + "serviceName": str(self.component_id) + }, + "id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16], + 'tags': {'total_cores': str(self.resource_manager.total_cores)}} + elif eventType == "IPS_CALL_END": + trace = {} # Zipkin json format + if start_time is not None and end_time is not None: + trace['timestamp'] = int(start_time*1e6) # convert to microsecond + trace['duration'] = int((end_time-start_time)*1e6) # convert to microsecond + if target is not None: + trace['localEndpoint'] = {"serviceName": target} + trace['name'] = operation + trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16] + trace["parentId"] = hashlib.md5(str(self.component_id).encode()).hexdigest()[:16] + + if trace: + portal_data['trace'] = trace event_body = {} event_body['sim_name'] = sim_name diff --git a/ipsframework/portalBridge.py b/ipsframework/portalBridge.py index 1225ada8..15ed3a34 100644 --- a/ipsframework/portalBridge.py +++ b/ipsframework/portalBridge.py @@ -199,6 +199,9 @@ def process_event(self, topicName, theEvent): portal_data['portal_runid'] = sim_data.portal_runid portal_data['seqnum'] = sim_data.counter + if 'trace' in portal_data: + portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() + self.send_event(sim_data, portal_data) sim_data.counter += 1 self.counter += 1 diff --git a/ipsframework/resourceManager.py b/ipsframework/resourceManager.py index 2ac26f62..7a95b855 100644 --- a/ipsframework/resourceManager.py +++ b/ipsframework/resourceManager.py @@ -356,7 +356,7 @@ def get_allocation(self, comp_id, nproc, task_id, else: try: self.processes += nproc - k = 0 + cores_allocated = 0 alloc_procs = 0 node_file_entries = [] if whole_nodes: @@ -371,10 +371,10 @@ def get_allocation(self, comp_id, nproc, task_id, self.avail_nodes.remove(n) self.alloc_nodes.append(n) node_file_entries.append((n, cores)) - k += procs - self.alloc_cores += k - self.avail_cores -= k - self.active_tasks.update({task_id: (comp_id, nproc, k)}) + cores_allocated += procs + self.alloc_cores += cores_allocated + self.avail_cores -= cores_allocated + self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)}) elif whole_socks: # ------------------------------- # whole sock allocation @@ -388,7 +388,7 @@ def get_allocation(self, comp_id, nproc, task_id, whole_socks, task_id, comp_id, to_alloc) - k += len(cores) + cores_allocated += len(cores) alloc_procs = min([ppn, len(cores)]) node_file_entries.append((n, cores)) if n not in self.alloc_nodes: @@ -396,9 +396,9 @@ def get_allocation(self, comp_id, nproc, task_id, if node.avail_cores - node.total_cores == 0: self.avail_nodes.remove(n) - self.alloc_cores += k - self.avail_cores -= k - self.active_tasks.update({task_id: (comp_id, nproc, k)}) + self.alloc_cores += cores_allocated + self.avail_cores -= cores_allocated + self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)}) else: # ------------------------------- # single core allocation @@ -407,22 +407,22 @@ def get_allocation(self, comp_id, nproc, task_id, node = self.nodes[n] if node.avail_cores > 0: to_alloc = min([ppn, node.avail_cores, - nproc - k]) + nproc - cores_allocated]) self.fwk.debug("allocate task_id %d node %s %d cores" % (task_id, n, to_alloc)) procs, cores = node.allocate(whole_nodes, whole_socks, task_id, comp_id, to_alloc) - k += procs + cores_allocated += procs node_file_entries.append((n, cores)) if n not in self.alloc_nodes: self.alloc_nodes.append(n) if node.avail_cores - node.total_cores == 0: self.avail_nodes.remove(n) - self.alloc_cores += k - self.avail_cores -= k - self.active_tasks.update({task_id: (comp_id, nproc, k)}) + self.alloc_cores += cores_allocated + self.avail_cores -= cores_allocated + self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)}) except Exception: print("Available Nodes:") for nm in self.avail_nodes: @@ -443,10 +443,10 @@ def get_allocation(self, comp_id, nproc, task_id, if whole_nodes: self.report_RM_status("allocation for task %d using whole nodes" % task_id) - return not whole_nodes, nodes, ppn, self.max_ppn, cpp, self.accurateNodes + return not whole_nodes, nodes, ppn, self.max_ppn, cpp, self.accurateNodes, cores_allocated else: self.report_RM_status("allocation for task %d using partial nodes" % task_id) - return not whole_nodes, nodes, node_file_entries, ppn, self.max_ppn, self.accurateNodes + return not whole_nodes, nodes, node_file_entries, ppn, self.max_ppn, self.accurateNodes, cores_allocated def check_whole_node_cap(self, nproc, ppn): """ diff --git a/ipsframework/services.py b/ipsframework/services.py index 72d82e57..9d5241a8 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -7,7 +7,7 @@ import os import subprocess import threading - +import hashlib import time import shutil import logging @@ -77,8 +77,8 @@ def launch(binary, task_name, working_dir, *args, **keywords): cmd = f"{binary} {' '.join(map(str, args))}" with worker.lock: - print(json.dumps({"event_type": "IPS_LAUNCH_DASK_TASK", "event_time": time.time(), - "event_comment": f"task_name = {task_name}, Target = {cmd}"}), + print(json.dumps({"eventType": "IPS_LAUNCH_DASK_TASK", "event_time": time.time(), + "comment": f"task_name = {task_name}, Target = {cmd}"}), file=worker_event_log) cmd_lst = cmd.split() @@ -91,26 +91,34 @@ def launch(binary, task_name, working_dir, *args, **keywords): ret_val = process.wait(timeout) finish_time = time.time() with worker.lock: - print(json.dumps({"event_type": "IPS_TASK_END", "event_time": finish_time, - "event_comment": f"task_name = {task_name}, elasped time = {finish_time - start_time:.2f}s"}), + print(json.dumps({"eventType": "IPS_TASK_END", "event_time": finish_time, + "comment": f"task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s", + "start_time": start_time, + "elapsed_time": finish_time - start_time, + "target": binary, + "operation": ' '.join(map(str, args))}), file=worker_event_log) except subprocess.TimeoutExpired: with worker.lock: - print(json.dumps({"event_type": "IPS_TASK_END", "event_time": time.time(), - "event_comment": f"task_name = {task_name}, timed-out after {timeout}s"}), + print(json.dumps({"eventType": "IPS_TASK_END", "event_time": time.time(), + "comment": f"task_name = {task_name}, timed-out after {timeout}s"}), file=worker_event_log) os.killpg(process.pid, signal.SIGKILL) ret_val = -1 else: with worker.lock: - print(json.dumps({"event_type": "IPS_LAUNCH_DASK_TASK", "event_time": time.time(), - "event_comment": f"task_name = {task_name}, Target = {binary.__name__}({','.join(map(str, args))})"}), + print(json.dumps({"eventType": "IPS_LAUNCH_DASK_TASK", "event_time": time.time(), + "comment": f"task_name = {task_name}, Target = {binary.__name__}({','.join(map(str, args))})"}), file=worker_event_log) ret_val = binary(*args) finish_time = time.time() with worker.lock: - print(json.dumps({"event_type": "IPS_TASK_END", "event_time": finish_time, - "event_comment": f"task_name = {task_name}, elasped time = {finish_time - start_time:.2f}s"}), + print(json.dumps({"eventType": "IPS_TASK_END", "event_time": finish_time, + "comment": f"task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s", + "start_time": start_time, + "elapsed_time": finish_time - start_time, + "target": binary.__name__, + "operation": f"({','.join(map(str, args))})"}), file=worker_event_log) return task_name, ret_val @@ -384,7 +392,14 @@ def _send_monitor_event(self, comment='', ok='True', state='Running', - event_time=None): + event_time=None, + elapsed_time=None, + start_time=None, + end_time=None, + target=None, + operation=None, + procs_requested=None, + cores_allocated=None): """ Construct and send an event populated with the component's information, *eventType*, *comment*, *ok*, *state*, and a wall time @@ -399,6 +414,30 @@ def _send_monitor_event(self, if event_time is None: event_time = time.time() portal_data['walltime'] = '%.2f' % (event_time - self.component_ref.start_time) + + trace = {} # Zipkin json format + if start_time is not None and (elapsed_time is not None or end_time is not None) and target is not None and operation is not None: + trace['timestamp'] = int(start_time*1e6) # convert to microsecond + if elapsed_time is not None: + trace['duration'] = int(elapsed_time*1e6) + elif end_time is not None: + trace['duration'] = int((end_time-start_time)*1e6) # convert to microsecond + trace['localEndpoint'] = {"serviceName": target} + trace['name'] = operation + formatted_args = ['%.3f' % (x) if isinstance(x, float) + else str(x) for x in self.component_ref.args] + trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16] + trace['parentId'] = hashlib.md5(f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)})" + .encode()).hexdigest()[:16] + trace['tags'] = {} + if procs_requested is not None: + trace['tags']['procs_requested'] = str(procs_requested) + if cores_allocated is not None: + trace['tags']['cores_allocated'] = str(cores_allocated) + + if trace: + portal_data['trace'] = trace + portal_data['state'] = state portal_data['comment'] = comment if self.monitor_url: @@ -428,7 +467,7 @@ def cleanup(self): method in the base class for components. """ - for (p, _, _) in self.task_map.values(): + for p, *_ in self.task_map.values(): try: p.kill() except Exception: @@ -447,9 +486,7 @@ def call_nonblocking(self, component_id, method_name, *args, **keywords): :return: call_id :rtype: int """ - target_class = component_id.get_class_name() - target_seqnum = component_id.get_seq_num() - target = target_class + '@' + str(target_seqnum) + target = str(component_id) formatted_args = ['%.3f' % (x) if isinstance(x, float) else str(x) for x in args] if keywords: @@ -461,7 +498,7 @@ def call_nonblocking(self, component_id, method_name, *args, **keywords): 'init_call', method_name, *args, **keywords) call_id = self._get_service_response(msg_id, True) - self.call_targets[call_id] = (target, method_name, args) + self.call_targets[call_id] = (target, method_name, args, time.time()) return call_id def call(self, component_id, method_name, *args, **keywords): @@ -495,18 +532,22 @@ def wait_call(self, call_id, block=True): """ try: - (target, method_name, args) = self.call_targets[call_id] + (target, method_name, args, start_time) = self.call_targets[call_id] except KeyError: self.exception('Invalid call_id in wait-call() : %s', call_id) raise msg_id = self._invoke_service(self.fwk.component_id, 'wait_call', call_id, block) response = self._get_service_response(msg_id, block=True) - formatted_args = ['%.3f' % (x) if isinstance(x, float) - else str(x) for x in args] - self._send_monitor_event('IPS_CALL_END', 'Target = ' + - target + ':' + method_name + '(' + - str(*formatted_args) + ')') + formatted_args = ','.join('%.3f' % (x) if isinstance(x, float) + else str(x) for x in args) + target_full = f'{target}:{method_name}({formatted_args})' + self._send_monitor_event('IPS_CALL_END', 'Target = ' + target_full, + start_time=start_time, + end_time=time.time(), + elapsed_time=time.time()-start_time, + target=target, + operation=f'{method_name}({formatted_args})') del self.call_targets[call_id] return response @@ -626,20 +667,22 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): 'init_task', nproc, binary_fullpath, working_dir, task_ppn, block, whole_nodes, whole_socks, task_cpp, *args) - (task_id, command, env_update) = self._get_service_response(msg_id, block=True) + (task_id, command, env_update, cores_allocated) = self._get_service_response(msg_id, block=True) except Exception: raise - task_id = self._launch_task(nproc, working_dir, task_id, command, env_update, tag, keywords) + task_id = self._launch_task(nproc, working_dir, task_id, command, cores_allocated, env_update, tag, keywords, binary, args) if env_update: - self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}, env = {env_update}') + self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}, env = {env_update}', + procs_requested=nproc, cores_allocated=cores_allocated) else: - self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}') + self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}', + procs_requested=nproc, cores_allocated=cores_allocated) return task_id - def _launch_task(self, nproc, working_dir, task_id, command, env_update, tag, keywords): + def _launch_task(self, nproc, working_dir, task_id, command, cores_allocated, env_update, tag, keywords, binary, args): log_filename = keywords.get('logfile') timeout = keywords.get("timeout", 1.e9) @@ -685,7 +728,7 @@ def _launch_task(self, nproc, working_dir, task_id, command, env_update, tag, ke # FIXME: process Monitoring Command : ps --no-headers -o pid,state pid1 pid2 pid3 ... - self.task_map[task_id] = (process, time.time(), timeout) + self.task_map[task_id] = (process, time.time(), timeout, nproc, cores_allocated, command, binary, args) return task_id # process.pid def launch_task_pool(self, task_pool_name, launch_interval=0.0): @@ -732,18 +775,24 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0): if launch_interval > 0: time.sleep(launch_interval) task = queued_tasks[task_name] - (task_id, command, env_update) = allocated_tasks[task_name] + (task_id, command, env_update, cores_allocated) = allocated_tasks[task_name] tag = task.keywords.get('tag', 'None') - active_tasks[task_name] = self._launch_task(task.nproc, task.working_dir, task_id, command, env_update, tag, task.keywords) + active_tasks[task_name] = self._launch_task(task.nproc, + task.working_dir, task_id, command, cores_allocated, + env_update, tag, task.keywords, task.binary, task.args) if env_update: self._send_monitor_event('IPS_LAUNCH_TASK_POOL', f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}' - f', env = {env_update}') + f', env = {env_update}', + procs_requested=task.nproc, + cores_allocated=cores_allocated) else: self._send_monitor_event('IPS_LAUNCH_TASK_POOL', - f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}') + f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}', + procs_requested=task.nproc, + cores_allocated=cores_allocated) return active_tasks @@ -759,7 +808,7 @@ def kill_task(self, task_id): :rtype: bool """ try: - process, _, _ = self.task_map[task_id] + process, *_ = self.task_map[task_id] # TODO: process and start_time will have to be accessed as shown # below if this task can be relaunched to support FT... except KeyError: @@ -805,7 +854,7 @@ def wait_task_nonblocking(self, task_id): :return: return value of task if finished else None """ try: - process, start_time, timeout = self.task_map[task_id] + process, start_time, timeout, *_ = self.task_map[task_id] # TODO: process and start_time will have to be accessed as shown # below if this task can be relaunched to support FT... except KeyError: @@ -842,7 +891,7 @@ def wait_task(self, task_id, timeout=-1, delay=1): :return: return value of task """ try: - process, start_time, _ = self.task_map[task_id] + process, start_time, _, nproc, cores, _, binary, args = self.task_map[task_id] except KeyError: self.exception('Error: unrecognizable task_id = %s ', str(task_id)) raise @@ -857,14 +906,23 @@ def wait_task(self, task_id, timeout=-1, delay=1): time.sleep(delay) else: break + + finish_time = time.time() if task_retval is None: process.kill() task_retval = process.wait() self._send_monitor_event('IPS_TASK_END', 'task_id = %s TIMEOUT elapsed time = %.2f S' % - (str(task_id), time.time() - start_time)) + (str(task_id), finish_time - start_time)) else: self._send_monitor_event('IPS_TASK_END', 'task_id = %s elapsed time = %.2f S' % - (str(task_id), time.time() - start_time)) + (str(task_id), finish_time - start_time), + start_time=start_time, + end_time=finish_time, + elapsed_time=finish_time - start_time, + procs_requested=nproc, + cores_allocated=cores, + target=binary, + operation=" ".join(args)) del self.task_map[task_id] try: @@ -1272,7 +1330,11 @@ def stage_input_files(self, input_file_list): self._send_monitor_event(eventType='IPS_STAGE_INPUTS', comment='Elapsed time = %.3f Path = %s Files = %s' % (elapsed_time, os.path.abspath(inputDir), - str(input_file_list))) + str(input_file_list)), + start_time=start_time, + elapsed_time=elapsed_time, + target="stage_input_files", + operation=str(input_file_list)) def stage_subflow_output_files(self, subflow_name='ALL'): """Gather outputs from sub-workflows. Sub-workflow output is defined @@ -1458,7 +1520,11 @@ def stage_output_files(self, timeStamp, file_list, keep_old_files=True, save_pla elapsed_time = time.time() - start_time self._send_monitor_event('IPS_STAGE_OUTPUTS', 'Elapsed time = %.3f Path = %s Files = %s' % - (elapsed_time, output_dir, str(file_list))) + (elapsed_time, output_dir, str(file_list)), + start_time=start_time, + elapsed_time=elapsed_time, + target="stage_output_files", + operation=str(file_list)) def save_restart_files(self, timeStamp, file_list): """ @@ -1565,7 +1631,11 @@ def stage_state(self, state_files=None): elapsed_time = time.time() - start_time self._send_monitor_event('IPS_STAGE_STATE', 'Elapsed time = %.3f files = %s Success' % - (elapsed_time, ' '.join(files))) + (elapsed_time, ' '.join(files)), + start_time=start_time, + elapsed_time=elapsed_time, + target="stage_state", + operation=str(files)) def update_state(self, state_files=None): """ @@ -1600,7 +1670,11 @@ def update_state(self, state_files=None): elapsed_time = time.time() - start_time self._send_monitor_event('IPS_UPDATE_STATE', 'Elapsed time = %.3f files = %s Success' % - (elapsed_time, ' '.join(files))) + (elapsed_time, ' '.join(files)), + start_time=start_time, + elapsed_time=elapsed_time, + target="update_state", + operation=str(files)) def merge_current_state(self, partial_state_file, logfile=None, merge_binary=None): """ @@ -1703,13 +1777,15 @@ def process_events(self): def send_portal_event(self, event_type="COMPONENT_EVENT", event_comment="", - event_time=None): + event_time=None, + elapsed_time=None): """ Send event to web portal. """ return self._send_monitor_event(eventType=event_type, comment=event_comment, - event_time=event_time) + event_time=event_time, + elapsed_time=elapsed_time) def log(self, msg, *args): """ @@ -1785,8 +1861,10 @@ def submit_tasks(self, task_pool_name, block=True, use_dask=False, dask_nodes=1, self._send_monitor_event('IPS_TASK_POOL_BEGIN', 'task_pool = %s ' % task_pool_name) task_pool: TaskPool = self.task_pools[task_pool_name] retval = task_pool.submit_tasks(block, use_dask, dask_nodes, dask_ppn, launch_interval, use_shifter) + elapsed_time = time.time() - start_time self._send_monitor_event('IPS_TASK_POOL_END', 'task_pool = %s elapsed time = %.2f S' % - (task_pool_name, time.time() - start_time)) + (task_pool_name, elapsed_time), + elapsed_time=elapsed_time) return retval def get_finished_tasks(self, task_pool_name): @@ -2146,7 +2224,7 @@ def get_dask_finished_tasks_status(self): events.sort(key=itemgetter('event_time')) for event in events: - self.services.send_portal_event(**event) + self.services._send_monitor_event(**event) except Exception as e: # If it fails for any other reason, make sure we can continue self.services.exception('Error while reading dask worker log files: %s', str(e)) diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 4a2001c5..2ef07713 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -260,9 +260,9 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, self.fwk.debug('RM: get_allocation() returned %s', str(retval)) partial_node = retval[0] if partial_node: - (nodelist, corelist, ppn, max_ppn, accurateNodes) = retval[1:] + (nodelist, corelist, ppn, max_ppn, accurateNodes, cores_allocated) = retval[1:] else: - (nodelist, ppn, max_ppn, cpp, accurateNodes) = retval[1:] + (nodelist, ppn, max_ppn, cpp, accurateNodes, cores_allocated) = retval[1:] if partial_node: nodes = ','.join(nodelist) @@ -292,7 +292,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, 'launch_cmd': cmd, 'env_update': env_update} - return (task_id, cmd, env_update) + return (task_id, cmd, env_update, cores_allocated) def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, max_ppn, nodes, accurateNodes, partial_nodes, @@ -557,14 +557,14 @@ def init_task_pool(self, init_task_msg): except BadResourceRequestException as e: self.fwk.error("There has been a fatal error, %s requested %d too many processors in task %d", caller_id, e.deficit, e.task_id) - for task_id, _, _ in ret_dict.values(): + for task_id, _, _, _ in ret_dict.values(): self.resource_mgr.release_allocation(task_id, -1) del self.curr_task_table[task_id] raise except ResourceRequestMismatchException as e: self.fwk.error("There has been a fatal error, %s requested too few processors per node to launch task %d (request: procs = %d, ppn = %d)", caller_id, e.task_id, e.nproc, e.ppn) - for task_id, _, _ in ret_dict.values(): + for task_id, _, _, _ in ret_dict.values(): self.resource_mgr.release_allocation(task_id, -1) del self.curr_task_table[task_id] raise diff --git a/tests/helloworld/test_helloworld.py b/tests/helloworld/test_helloworld.py index b91af60b..09113b3b 100644 --- a/tests/helloworld/test_helloworld.py +++ b/tests/helloworld/test_helloworld.py @@ -4,6 +4,7 @@ import json import glob import socketserver +import hashlib import pytest from ipsframework import Framework, TaskPool @@ -259,6 +260,7 @@ def test_helloworld_task_pool_dask(tmpdir, capfd): @pytest.mark.skipif(sys.platform == 'darwin', reason="This doesn't work with macOS") +@pytest.mark.timeout(120) def test_helloworld_portal(tmpdir, capfd): data_dir = os.path.dirname(__file__) copy_config_and_replace(os.path.join(data_dir, "hello_world.ips"), tmpdir.join("hello_world.ips"), tmpdir, portal=True) @@ -306,7 +308,8 @@ def handle(self): framework.run() - for _ in range(8): + # just get the first 5 events + for _ in range(5): server.handle_request() captured = capfd.readouterr() @@ -339,7 +342,7 @@ def handle(self): assert '.eventlog' in exts # check data sent to portal - assert len(data) >= 6 + assert len(data) == 5 # get first event to check event = json.loads(data[0].split('\r\n')[-1]) assert event['code'] == 'Framework' @@ -348,3 +351,24 @@ def handle(self): assert event['state'] == 'Running' assert event['sim_name'] == 'Hello_world_1' assert event['seqnum'] == 0 + assert 'ips_version' in event + + # get last event to check + event = json.loads(data[-1].split('\r\n')[-1]) + assert event['code'] == 'DRIVERS_HELLO_HelloDriver' + assert event['eventtype'] == 'IPS_CALL_END' + assert event['comment'] == 'Target = Hello_world_1@HelloWorker@2:init(0.000)' + assert event['state'] == 'Running' + assert event['sim_name'] == 'Hello_world_1' + assert 'trace' in event + trace = event['trace'] + assert 'duration' in trace + assert 'timestamp' in trace + assert 'id' in trace + assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000)'.encode()).hexdigest()[:16] + assert 'traceId' in trace + assert trace['traceId'] == hashlib.md5(event['portal_runid'].encode()).hexdigest() + assert 'parentId' in trace + assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0)'.encode()).hexdigest()[:16] + assert 'localEndpoint' in trace + assert trace['localEndpoint']['serviceName'] == 'Hello_world_1@HelloWorker@2' diff --git a/tests/new/test_dask.py b/tests/new/test_dask.py index e572565e..76aadfc9 100644 --- a/tests/new/test_dask.py +++ b/tests/new/test_dask.py @@ -107,7 +107,7 @@ def test_dask(tmpdir): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 22 + assert len(lines) == 28 eventtypes = [e.get('eventtype') for e in lines] assert eventtypes.count('IPS_LAUNCH_DASK_TASK') == 4 @@ -119,7 +119,7 @@ def test_dask(tmpdir): task_end_comments = [e.get('comment')[:-4] for e in lines if e.get('eventtype') == "IPS_TASK_END"] for task in range(4): - assert f'task_name = task_{task}, elasped time = 1' in task_end_comments + assert f'task_name = task_{task}, elapsed time = 1' in task_end_comments @pytest.mark.skipif(shutil.which('shifter') is not None, @@ -154,7 +154,7 @@ def test_dask_shifter_fail(tmpdir): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 8 + assert len(lines) == 12 assert lines[-1].get('eventtype') == "IPS_END" assert lines[-1].get('comment') == "Simulation Execution Error" @@ -210,7 +210,7 @@ def test_dask_fake_shifter(tmpdir, monkeypatch): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 22 + assert len(lines) == 28 eventtypes = [e.get('eventtype') for e in lines] assert eventtypes.count('IPS_LAUNCH_DASK_TASK') == 4 @@ -222,7 +222,7 @@ def test_dask_fake_shifter(tmpdir, monkeypatch): task_end_comments = [e.get('comment')[:-4] for e in lines if e.get('eventtype') == "IPS_TASK_END"] for task in range(4): - assert f'task_name = task_{task}, elasped time = 1' in task_end_comments + assert f'task_name = task_{task}, elapsed time = 1' in task_end_comments # check shifter.log file with open(str(tmpdir.join('/work/DASK__dask_worker_2').join('shifter.log')), 'r') as f: @@ -270,7 +270,7 @@ def test_dask_timeout(tmpdir): with open(json_files[0], 'r') as json_file: lines = json_file.readlines() lines = [json.loads(line.strip()) for line in lines] - assert len(lines) == 22 + assert len(lines) == 28 eventtypes = [e.get('eventtype') for e in lines] assert eventtypes.count('IPS_LAUNCH_DASK_TASK') == 4 diff --git a/tests/new/test_ips_framework.py b/tests/new/test_ips_framework.py index 93bd044d..89869f00 100644 --- a/tests/new/test_ips_framework.py +++ b/tests/new/test_ips_framework.py @@ -125,11 +125,11 @@ def test_framework_simple(tmpdir, capfd): with open(json_files[0], 'r') as json_file: json_lines = json_file.readlines() - assert len(json_lines) == 3 + assert len(json_lines) == 9 event0 = json.loads(json_lines[0]) event1 = json.loads(json_lines[1]) - event2 = json.loads(json_lines[2]) + event2 = json.loads(json_lines[8]) assert event0['eventtype'] == 'IPS_START' assert event1['eventtype'] == 'IPS_RESOURCE_ALLOC' diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index be9f0cfe..c73c8d41 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -403,82 +403,87 @@ def test_init_task_srun(tmpdir): rm.accurateNodes = True def init_final_task(nproc, tppn, tcpt=0): - task_id, cmd, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', - nproc, 'exe', '/dir', tppn, True, - True, True, tcpt)) + task_id, cmd, _, cores_allocated = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + nproc, 'exe', '/dir', tppn, True, + True, True, tcpt)) tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', task_id, None)) - return task_id, cmd + return task_id, cmd, cores_allocated - task_id, cmd = init_final_task(1, 0) + task_id, cmd, cores = init_final_task(1, 0) assert task_id == 1 assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " + assert cores == 2 - task_id, cmd = init_final_task(2, 0) + task_id, cmd, cores = init_final_task(2, 0) assert task_id == 2 assert cmd == "srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe " + assert cores == 2 with pytest.raises(ResourceRequestUnequalPartitioningException): init_final_task(3, 0) - task_id, cmd = init_final_task(4, 0) + task_id, cmd, cores = init_final_task(4, 0) assert task_id == 4 assert cmd == "srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores exe " + assert cores == 4 with pytest.raises(BadResourceRequestException): init_final_task(5, 0) - task_id, cmd = init_final_task(1, 1) + task_id, cmd, cores = init_final_task(1, 1) assert task_id == 6 assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " + assert cores == 2 - task_id, cmd = init_final_task(2, 1) + task_id, cmd, cores = init_final_task(2, 1) assert task_id == 7 assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe " + assert cores == 4 with pytest.raises(ResourceRequestMismatchException): init_final_task(3, 1) fwk.reset_mock() - task_id, cmd = init_final_task(1, 1, 2) + task_id, cmd, cores = init_final_task(1, 1, 2) assert task_id == 9 assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() - task_id, cmd = init_final_task(1, 1, 1) + task_id, cmd, cores = init_final_task(1, 1, 1) assert task_id == 10 assert cmd == "srun -N 1 -n 1 -c 1 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() - task_id, cmd = init_final_task(1, 1, 4) + task_id, cmd, cores = init_final_task(1, 1, 4) assert task_id == 11 assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_called_once_with("task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead") fwk.reset_mock() - task_id, cmd = init_final_task(2, 1, 2) + task_id, cmd, cores = init_final_task(2, 1, 2) assert task_id == 12 assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() - task_id, cmd = init_final_task(2, 1, 1) + task_id, cmd, cores = init_final_task(2, 1, 1) assert task_id == 13 assert cmd == "srun -N 2 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() - task_id, cmd = init_final_task(2, 1, 12) + task_id, cmd, cores = init_final_task(2, 1, 12) assert task_id == 14 assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_called_once_with("task cpp (12) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead") # start two task, second should fail with Insufficient Resources depending on block - task_id, cmd, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', - 4, 'exe', '/dir', 0, True, - True, True, 0)) + task_id, cmd, _, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + 4, 'exe', '/dir', 0, True, + True, True, 0)) with pytest.raises(BlockedMessageException): tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', @@ -518,80 +523,88 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): if msg is None: msg = {f'task{n}': (nproc, '/dir', f'exe{n}', (f'arg{n}',), tppn, True, False, tcpp) for n in range(number_of_tasks)} retval = tm.init_task_pool(ServiceRequestMessage('id', 'id', 'c', 'init_task_pool', msg)) - for task_id, _, _ in retval.values(): + for task_id, _, _, _ in retval.values(): tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', task_id, None)) return retval retval = init_final_task_pool(1, 0, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 1 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 2 retval = init_final_task_pool(2, 0, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 2 assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 2 with pytest.raises(ResourceRequestUnequalPartitioningException): init_final_task_pool(3, 0, 1) retval = init_final_task_pool(4, 0, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 4 assert cmd == 'srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 4 with pytest.raises(BadResourceRequestException): init_final_task_pool(5, 0, 1) retval = init_final_task_pool(1, 1, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 6 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 2 retval = init_final_task_pool(2, 1, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 7 assert cmd == 'srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 4 with pytest.raises(ResourceRequestMismatchException): init_final_task_pool(3, 1, 1) retval = init_final_task_pool(1, 0, 2) assert len(retval) == 2 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 9 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' - task_id, cmd, _ = retval['task1'] - assert task_id == 10 - assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe1 arg1' + assert cores == 2 + task_id, cmd, _, cores = retval['task1'] + assert cores == 2 retval = init_final_task_pool(2, 0, 2) assert len(retval) == 2 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 11 assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' - task_id, cmd, _ = retval['task1'] + assert cores == 2 + task_id, cmd, _, cores = retval['task1'] assert task_id == 12 assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe1 arg1' + assert cores == 2 retval = init_final_task_pool(4, 0, 2) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 13 assert cmd == 'srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' + assert cores == 4 # now try with task_cpp set fwk.reset_mock() retval = init_final_task_pool(1, 1, 1, 2) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 15 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() @@ -599,7 +612,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.reset_mock() retval = init_final_task_pool(1, 1, 1, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 16 assert cmd == 'srun -N 1 -n 1 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() @@ -607,7 +620,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.reset_mock() retval = init_final_task_pool(1, 1, 1, 4) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 17 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') @@ -615,7 +628,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.reset_mock() retval = init_final_task_pool(2, 1, 1, 2) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 18 assert cmd == 'srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() @@ -623,7 +636,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.reset_mock() retval = init_final_task_pool(2, 1, 1, 1) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 19 assert cmd == 'srun -N 2 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() @@ -631,7 +644,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.reset_mock() retval = init_final_task_pool(2, 1, 1, 4) assert len(retval) == 1 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 20 assert cmd == 'srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') @@ -641,12 +654,14 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): 'task1': (2, '/dir', 'exe1', ('arg1',), 0, True, False, 0)} retval = init_final_task_pool(msg=msg) assert len(retval) == 2 - task_id, cmd, _ = retval['task0'] + task_id, cmd, _, cores = retval['task0'] assert task_id == 21 assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' - task_id, cmd, _ = retval['task1'] + assert cores == 2 + task_id, cmd, _, cores = retval['task1'] assert task_id == 22 assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe1 arg1' + assert cores == 2 # one good task, one bad task msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0),