Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions doc/examples/dask/simulation_log.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.83",
"comment": "task_name = method, elasped time = 0.50s",
"comment": "task_name = method, elapsed time = 0.50s",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.83",
"comment": "task_name = function, elasped time = 0.50s",
"comment": "task_name = function, elapsed time = 0.50s",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.85",
"state": "Running",
"comment": "task_name = binary, elasped time = 0.52s",
"comment": "task_name = binary, elapsed time = 0.52s",
}
14 changes: 8 additions & 6 deletions ipsframework/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def __init__(self, services, config):
self.config = config
self.start_time = 0.0
self.sys_exit = None
self.method_name = None
self.args = None
for i in config.keys():
try:
setattr(self, i, config[i])
Expand Down Expand Up @@ -121,19 +123,19 @@ def __run__(self):
self.services.log('Received Message ')
sender_id = msg.sender_id
call_id = msg.call_id
method_name = msg.target_method
args = msg.args
self.method_name = msg.target_method
self.args = msg.args
keywords = msg.keywords
formatted_args = ['%.3f' % (x) if isinstance(x, float)
else str(x) for x in args]
else str(x) for x in self.args]
if keywords:
formatted_args += [" %s=" % k + str(v) for (k, v) in keywords.items()]

self.services.debug('Calling method ' + method_name +
self.services.debug('Calling method ' + self.method_name +
"(" + ' ,'.join(formatted_args) + ")")
try:
method = getattr(self, method_name)
retval = method(*args, **keywords)
method = getattr(self, self.method_name)
retval = method(*self.args, **keywords)
except Exception as e:
self.services.exception('Uncaught Exception in component method.')
response_msg = MethodResultMessage(self.component_id,
Expand Down
62 changes: 50 additions & 12 deletions ipsframework/ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import logging
import os
import time
import hashlib
from ipsframework import platformspec
from ipsframework.messages import Message, ServiceRequestMessage, \
ServiceResponseMessage, MethodInvokeMessage
Expand Down Expand Up @@ -181,7 +182,7 @@ def __init__(self, config_file_list, log_file_name, platform_file_name=None,
logger.addHandler(self.ch)
self.logger = logger
self.verbose_debug = verbose_debug
self.outstanding_calls_list = []
self.outstanding_calls_list = {}
self.call_queue_map = {}

# add the handler to the root logger
Expand Down Expand Up @@ -421,7 +422,7 @@ def run(self):
for method in ['step', 'finalize']:
req_msg = ServiceRequestMessage(self.component_id, self.component_id,
comp_id, 'init_call', method, 0)
msg_list.append(req_msg)
msg_list.append((req_msg, None, str(comp_id), method, 0))

outstanding_sim_calls[str(comp_id)] = msg_list

Expand All @@ -447,7 +448,7 @@ def run(self):
self.component_id,
comp_id,
'init_call', method, 0)
msg_list.append(req_msg)
msg_list.append((req_msg, sim_name, str(comp_id), method, 0))
# SIMYAN: add the msg_list to the outstanding sim calls
if msg_list:
outstanding_sim_calls[sim_name] = msg_list
Expand All @@ -460,11 +461,15 @@ def run(self):
# send off first round of invocations...
try:
for sim_name, msg_list in outstanding_sim_calls.items():
msg = msg_list.pop(0)
msg, sim_name, comp, method, arg = msg_list.pop(0)
self.debug('Framework sending message %s ', msg.__dict__)
if sim_name is not None:
self._send_monitor_event(sim_name=sim_name,
comment=f'Target = {comp}:{method}({arg})',
eventType='IPS_CALL_BEGIN')
call_id = self.task_manager.init_call(msg, manage_return=False)
self.call_queue_map[call_id] = msg_list
self.outstanding_calls_list.append(call_id)
self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time()
except Exception:
self.exception('encountered exception during fwk.run() sending first round of invocations (init of inits and fwk comps)')
self.terminate_all_sims(status=Message.FAILURE)
Expand Down Expand Up @@ -503,7 +508,15 @@ def run(self):
self.task_manager.return_call(msg)
continue
# Message is a result from a framework invocation
self.outstanding_calls_list.remove(msg.call_id)
sim_name, comp, method, arg, start_time = self.outstanding_calls_list.pop(msg.call_id)
if sim_name is not None:
self._send_monitor_event(sim_name=sim_name,
comment=f'Target = {comp}:{method}({arg})',
eventType='IPS_CALL_END',
start_time=start_time,
end_time=time.time(),
target=comp,
operation=f'{method}({arg})')
sim_msg_list = self.call_queue_map[msg.call_id]
del self.call_queue_map[msg.call_id]
if msg.status == Message.FAILURE:
Expand All @@ -520,10 +533,14 @@ def run(self):
comment = 'Simulation Ended'
ok = True
try:
next_call_msg = sim_msg_list.pop(0)
next_call_msg, sim_name, comp, method, arg = sim_msg_list.pop(0)
if sim_name is not None:
self._send_monitor_event(sim_name=sim_name,
comment=f'Target = {comp}:{method}({arg})',
eventType='IPS_CALL_BEGIN')
call_id = self.task_manager.init_call(next_call_msg,
manage_return=False)
self.outstanding_calls_list.append(call_id)
self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time()
self.call_queue_map[call_id] = sim_msg_list
except IndexError:
sim_comps = self.config_manager.get_component_map() # Get any new dynamic simulations
Expand Down Expand Up @@ -554,16 +571,16 @@ def initiate_new_simulation(self, sim_name):
req_msg = ServiceRequestMessage(self.component_id,
self.component_id, comp_id,
'init_call', method, 0)
msg_list.append(req_msg)
msg_list.append((req_msg, sim_name, str(comp_id), method, 0))

# send off first round of invocations...
msg = msg_list.pop(0)
msg, sim_name, comp, method, arg = msg_list.pop(0)
self.debug('Framework sending message %s ', msg.__dict__)
call_id = self.task_manager.init_call(msg, manage_return=False)
self.call_queue_map[call_id] = msg_list
self.outstanding_calls_list.append(call_id)
self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time()

def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True'):
def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True', target=None, operation=None, start_time=None, end_time=None):
"""
Publish a portal monitor event to the *_IPS_MONITOR* event topic.
Event topics that start with an underscore are reserved for use by the
Expand Down Expand Up @@ -633,6 +650,27 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True'):
portal_data['state'] = 'Completed'
portal_data['stopat'] = time.strftime('%Y-%m-%d|%H:%M:%S%Z',
time.localtime())
# Zipkin json format
portal_data['trace'] = {"timestamp": int(self.start_time*1e6),
"duration": int((time.time() - self.start_time)*1e6),
"localEndpoint": {
"serviceName": str(self.component_id)
},
"id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16],
'tags': {'total_cores': str(self.resource_manager.total_cores)}}
elif eventType == "IPS_CALL_END":
trace = {} # Zipkin json format
if start_time is not None and end_time is not None:
trace['timestamp'] = int(start_time*1e6) # convert to microsecond
trace['duration'] = int((end_time-start_time)*1e6) # convert to microsecond
if target is not None:
trace['localEndpoint'] = {"serviceName": target}
trace['name'] = operation
trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16]
trace["parentId"] = hashlib.md5(str(self.component_id).encode()).hexdigest()[:16]

if trace:
portal_data['trace'] = trace

event_body = {}
event_body['sim_name'] = sim_name
Expand Down
3 changes: 3 additions & 0 deletions ipsframework/portalBridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ def process_event(self, topicName, theEvent):
portal_data['portal_runid'] = sim_data.portal_runid
portal_data['seqnum'] = sim_data.counter

if 'trace' in portal_data:
portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest()

self.send_event(sim_data, portal_data)
sim_data.counter += 1
self.counter += 1
Expand Down
32 changes: 16 additions & 16 deletions ipsframework/resourceManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def get_allocation(self, comp_id, nproc, task_id,
else:
try:
self.processes += nproc
k = 0
cores_allocated = 0
alloc_procs = 0
node_file_entries = []
if whole_nodes:
Expand All @@ -371,10 +371,10 @@ def get_allocation(self, comp_id, nproc, task_id,
self.avail_nodes.remove(n)
self.alloc_nodes.append(n)
node_file_entries.append((n, cores))
k += procs
self.alloc_cores += k
self.avail_cores -= k
self.active_tasks.update({task_id: (comp_id, nproc, k)})
cores_allocated += procs
self.alloc_cores += cores_allocated
self.avail_cores -= cores_allocated
self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)})
elif whole_socks:
# -------------------------------
# whole sock allocation
Expand All @@ -388,17 +388,17 @@ def get_allocation(self, comp_id, nproc, task_id,
whole_socks,
task_id, comp_id,
to_alloc)
k += len(cores)
cores_allocated += len(cores)
alloc_procs = min([ppn, len(cores)])
node_file_entries.append((n, cores))
if n not in self.alloc_nodes:
self.alloc_nodes.append(n)
if node.avail_cores - node.total_cores == 0:
self.avail_nodes.remove(n)

self.alloc_cores += k
self.avail_cores -= k
self.active_tasks.update({task_id: (comp_id, nproc, k)})
self.alloc_cores += cores_allocated
self.avail_cores -= cores_allocated
self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)})
else:
# -------------------------------
# single core allocation
Expand All @@ -407,22 +407,22 @@ def get_allocation(self, comp_id, nproc, task_id,
node = self.nodes[n]
if node.avail_cores > 0:
to_alloc = min([ppn, node.avail_cores,
nproc - k])
nproc - cores_allocated])
self.fwk.debug("allocate task_id %d node %s %d cores" % (task_id, n, to_alloc))
procs, cores = node.allocate(whole_nodes,
whole_socks,
task_id, comp_id,
to_alloc)
k += procs
cores_allocated += procs
node_file_entries.append((n, cores))
if n not in self.alloc_nodes:
self.alloc_nodes.append(n)
if node.avail_cores - node.total_cores == 0:
self.avail_nodes.remove(n)

self.alloc_cores += k
self.avail_cores -= k
self.active_tasks.update({task_id: (comp_id, nproc, k)})
self.alloc_cores += cores_allocated
self.avail_cores -= cores_allocated
self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)})
except Exception:
print("Available Nodes:")
for nm in self.avail_nodes:
Expand All @@ -443,10 +443,10 @@ def get_allocation(self, comp_id, nproc, task_id,

if whole_nodes:
self.report_RM_status("allocation for task %d using whole nodes" % task_id)
return not whole_nodes, nodes, ppn, self.max_ppn, cpp, self.accurateNodes
return not whole_nodes, nodes, ppn, self.max_ppn, cpp, self.accurateNodes, cores_allocated
else:
self.report_RM_status("allocation for task %d using partial nodes" % task_id)
return not whole_nodes, nodes, node_file_entries, ppn, self.max_ppn, self.accurateNodes
return not whole_nodes, nodes, node_file_entries, ppn, self.max_ppn, self.accurateNodes, cores_allocated

def check_whole_node_cap(self, nproc, ppn):
"""
Expand Down
Loading