From 97b5228f9b6ec69f06a6a3bb87bfea55bfc48727 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 18 Nov 2021 15:46:31 -0500 Subject: [PATCH 01/10] Set cpus-per-task '-c' and OpenMP environment variables in srun command --- ipsframework/services.py | 10 ++++-- ipsframework/taskManager.py | 19 ++++++++-- tests/new/test_taskManager.py | 67 +++++++++++++++++++++++++---------- 3 files changed, 72 insertions(+), 24 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 1e29015b..a6af7591 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -628,8 +628,12 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): raise task_id = self._launch_task(nproc, working_dir, task_id, command, env_update, tag, keywords) - self._send_monitor_event('IPS_LAUNCH_TASK', 'task_id = %s , Tag = %s , nproc = %d , Target = %s' % - (str(task_id), tag, int(nproc), command)) + + if env_update: + self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}, env = {env_update}') + else: + self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}') + return task_id def _launch_task(self, nproc, working_dir, task_id, command, env_update, tag, keywords): @@ -662,7 +666,7 @@ def _launch_task(self, nproc, working_dir, task_id, command, env_update, tag, ke try: self.debug('Launching command : %s', command) if env_update: - new_env = os.environ + new_env = os.environ.copy() new_env.update(env_update) process = subprocess.Popen(cmd_lst, stdout=task_stdout, stderr=task_stderr, diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 668070bc..a159625e 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -504,8 +504,23 @@ def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, nproc_flag = '-n' nnodes_flag = '-N' num_nodes = len(nodes.split(',')) - cmd = ' '.join([self.task_launch_cmd, nnodes_flag, - str(num_nodes), nproc_flag, str(nproc)]) + if partial_nodes: + cmd = ' '.join([self.task_launch_cmd, + nnodes_flag, str(num_nodes), + nproc_flag, str(nproc)]) + else: + cpuptask_flag = '-c' + num_cores = self.resource_mgr.cores_per_node + cpuptask = num_cores//ppn + cpubind_flag = '--cpu-bind=cores' + cmd = ' '.join([self.task_launch_cmd, + nnodes_flag, str(num_nodes), + nproc_flag, str(nproc), + cpuptask_flag, str(cpuptask), + cpubind_flag]) + env_update = {'OMP_PLACES': 'threads', + 'OMP_PROC_BIND': 'spread', + 'OMP_NUM_THREADS': cpuptask} else: self.fwk.error("invalid task launch command.") raise RuntimeError("invalid task launch command.") diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index da96b9e5..387d5f9a 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -320,6 +320,7 @@ def test_build_launch_cmd_srun(): # test eval tm.task_launch_cmd = 'srun' tm.resource_mgr = mock.Mock(nodes=['node1']) + tm.resource_mgr.cores_per_node = 2 cmd = tm.build_launch_cmd(nproc=4, binary='executable', @@ -329,7 +330,7 @@ def test_build_launch_cmd_srun(): max_ppn=None, nodes='n1,n2', accurateNodes=None, - partial_nodes=None, + partial_nodes=True, task_id=None) assert cmd == ('srun -N 2 -n 4 executable ', None) @@ -342,11 +343,39 @@ def test_build_launch_cmd_srun(): max_ppn=None, nodes='n1,n2', accurateNodes=None, - partial_nodes=None, + partial_nodes=True, task_id=None) assert cmd == ('srun -N 2 -n 4 executable 13 42', None) + cmd = tm.build_launch_cmd(nproc=4, + binary='executable', + cmd_args=(), + working_dir=None, + ppn=2, + max_ppn=None, + nodes='n1,n2', + accurateNodes=None, + partial_nodes=False, + task_id=None) + + assert cmd == ('srun -N 2 -n 4 -c 1 --cpu-bind=cores executable ', + {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': 1}) + + cmd = tm.build_launch_cmd(nproc=2, + binary='executable', + cmd_args=('13', '42'), + working_dir=None, + ppn=1, + max_ppn=None, + nodes='n1,n2', + accurateNodes=None, + partial_nodes=False, + task_id=None) + + assert cmd == ('srun -N 2 -n 2 -c 2 --cpu-bind=cores executable 13 42', + {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': 2}) + def test_init_task_srun(tmpdir): # this will combine calls to ResourceManager.get_allocation and @@ -381,29 +410,29 @@ def init_final_task(nproc, tppn): task_id, cmd = init_final_task(1, 0) assert task_id == 1 - assert cmd == "srun -N 1 -n 1 exe " + assert cmd == "srun -N 1 -n 1 -c 2 --cpu-bind=cores exe " task_id, cmd = init_final_task(2, 0) assert task_id == 2 - assert cmd == "srun -N 1 -n 2 exe " + assert cmd == "srun -N 1 -n 2 -c 1 --cpu-bind=cores exe " with pytest.raises(ResourceRequestUnequalPartitioningException): init_final_task(3, 0) task_id, cmd = init_final_task(4, 0) assert task_id == 4 - assert cmd == "srun -N 2 -n 4 exe " + assert cmd == "srun -N 2 -n 4 -c 1 --cpu-bind=cores exe " with pytest.raises(BadResourceRequestException): init_final_task(5, 0) task_id, cmd = init_final_task(1, 1) assert task_id == 6 - assert cmd == "srun -N 1 -n 1 exe " + assert cmd == "srun -N 1 -n 1 -c 2 --cpu-bind=cores exe " task_id, cmd = init_final_task(2, 1) assert task_id == 7 - assert cmd == "srun -N 2 -n 2 exe " + assert cmd == "srun -N 2 -n 2 -c 2 --cpu-bind=cores exe " with pytest.raises(ResourceRequestMismatchException): init_final_task(3, 1) @@ -460,13 +489,13 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 1 - assert cmd == 'srun -N 1 -n 1 exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' retval = init_final_task_pool(2, 0, 1) assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 2 - assert cmd == 'srun -N 1 -n 2 exe0 arg0' + assert cmd == 'srun -N 1 -n 2 -c 1 --cpu-bind=cores exe0 arg0' with pytest.raises(ResourceRequestUnequalPartitioningException): init_final_task_pool(3, 0, 1) @@ -475,7 +504,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 4 - assert cmd == 'srun -N 2 -n 4 exe0 arg0' + assert cmd == 'srun -N 2 -n 4 -c 1 --cpu-bind=cores exe0 arg0' with pytest.raises(BadResourceRequestException): init_final_task_pool(5, 0, 1) @@ -484,13 +513,13 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 6 - assert cmd == 'srun -N 1 -n 1 exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' retval = init_final_task_pool(2, 1, 1) assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 7 - assert cmd == 'srun -N 2 -n 2 exe0 arg0' + assert cmd == 'srun -N 2 -n 2 -c 2 --cpu-bind=cores exe0 arg0' with pytest.raises(ResourceRequestMismatchException): init_final_task_pool(3, 1, 1) @@ -499,25 +528,25 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, msg=None): assert len(retval) == 2 task_id, cmd, _ = retval['task0'] assert task_id == 9 - assert cmd == 'srun -N 1 -n 1 exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' task_id, cmd, _ = retval['task1'] assert task_id == 10 - assert cmd == 'srun -N 1 -n 1 exe1 arg1' + assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe1 arg1' retval = init_final_task_pool(2, 0, 2) assert len(retval) == 2 task_id, cmd, _ = retval['task0'] assert task_id == 11 - assert cmd == 'srun -N 1 -n 2 exe0 arg0' + assert cmd == 'srun -N 1 -n 2 -c 1 --cpu-bind=cores exe0 arg0' task_id, cmd, _ = retval['task1'] assert task_id == 12 - assert cmd == 'srun -N 1 -n 2 exe1 arg1' + assert cmd == 'srun -N 1 -n 2 -c 1 --cpu-bind=cores exe1 arg1' retval = init_final_task_pool(4, 0, 2) assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 13 - assert cmd == 'srun -N 2 -n 4 exe0 arg0' + assert cmd == 'srun -N 2 -n 4 -c 1 --cpu-bind=cores exe0 arg0' # different size tasks msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False), @@ -526,10 +555,10 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, msg=None): assert len(retval) == 2 task_id, cmd, _ = retval['task0'] assert task_id == 15 - assert cmd == 'srun -N 1 -n 1 exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' task_id, cmd, _ = retval['task1'] assert task_id == 16 - assert cmd == 'srun -N 1 -n 2 exe1 arg1' + assert cmd == 'srun -N 1 -n 2 -c 1 --cpu-bind=cores exe1 arg1' # one good task, one bad task msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False), From f28ff0049f3f0ce079dc4d43504d50b9aec39f0b Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 19 Nov 2021 11:04:22 -0500 Subject: [PATCH 02/10] Add user specified task cores_per_task --- ipsframework/services.py | 12 +++++++-- ipsframework/taskManager.py | 23 +++++++++++++----- setup.cfg | 2 +- tests/new/test_taskManager.py | 46 +++++++++++++++++++++++++++++++---- 4 files changed, 69 insertions(+), 14 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index a6af7591..30b9ba91 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -181,6 +181,7 @@ def __init__(self, fwk, fwk_in_q, svc_response_q, sim_conf, log_pipe_name): self.sub_flows = {} self.binary_fullpath_cache = {} self.ppn = 0 + self.cpp = 0 self.shared_nodes = False def __initialize__(self, component_ref): @@ -241,6 +242,11 @@ def __initialize__(self, component_ref): except Exception: self.ppn = 0 + try: + self.cpp = int(conf['CPUS_PER_PROC']) + except Exception: + self.cpp = 0 + if self.sim_conf['SIMULATION_MODE'] == 'RESTART': if self.sim_conf['RESTART_TIME'] == 'LATEST': chkpts = glob.glob(os.path.join(self.sim_conf['RESTART_ROOT'], 'restart', '*')) @@ -611,6 +617,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): self.binary_fullpath_cache[binary] = binary_fullpath task_ppn = keywords.get('task_ppn', self.ppn) + task_cpp = keywords.get('task_cpp', self.cpp) block = keywords.get('block', True) tag = keywords.get('tag', 'None') @@ -622,7 +629,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): msg_id = self._invoke_service(self.fwk.component_id, 'init_task', nproc, binary_fullpath, working_dir, task_ppn, block, - whole_nodes, whole_socks, *args) + whole_nodes, whole_socks, task_cpp, *args) (task_id, command, env_update) = self._get_service_response(msg_id, block=True) except Exception: raise @@ -711,9 +718,10 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0): task_ppn = task.keywords.get('task_ppn', self.ppn) wnodes = task.keywords.get('whole_nodes', not self.shared_nodes) wsocks = task.keywords.get('whole_sockets', not self.shared_nodes) + task_cpp = task.keywords.get('task_cpp', 0) submit_dict[task_name] = (task.nproc, task.working_dir, task.binary, task.args, - task_ppn, wnodes, wsocks) + task_ppn, wnodes, wsocks, task_cpp) try: msg_id = self._invoke_service(self.fwk.component_id, diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index a159625e..1c3aecf9 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -222,9 +222,10 @@ def init_task(self, init_task_msg): block = init_task_msg.args[4] # Block waiting for available resources wnodes = init_task_msg.args[5] wsocks = init_task_msg.args[6] + tcpp = init_task_msg.args[7] # SIMYAN: increased arguments - cmd_args = init_task_msg.args[7:] + cmd_args = init_task_msg.args[8:] try: return self._init_task(caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks, cmd_args) @@ -279,7 +280,8 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks working_dir, ppn, max_ppn, nodes, accurateNodes, - False, task_id) + False, task_id, + cpp) self.curr_task_table[task_id] = {'component': caller_id, 'status': 'init_task', @@ -293,7 +295,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, max_ppn, nodes, accurateNodes, partial_nodes, - task_id, core_list=''): + task_id, tcpp=0, core_list=''): """ Construct task launch command to be executed by the component. @@ -511,16 +513,25 @@ def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, else: cpuptask_flag = '-c' num_cores = self.resource_mgr.cores_per_node - cpuptask = num_cores//ppn + max_cpp = num_cores//ppn + if tcpp > 0: + if tcpp > max_cpp: + self.fwk.warning(f"task cpp ({tcpp}) exceeds maximum possible for {ppn} procs per node " + f"with {num_cores} cores per node, using {max_cpp} cpus per proc instead") + cpp = max_cpp + else: + cpp = tcpp + else: + cpp = max_cpp cpubind_flag = '--cpu-bind=cores' cmd = ' '.join([self.task_launch_cmd, nnodes_flag, str(num_nodes), nproc_flag, str(nproc), - cpuptask_flag, str(cpuptask), + cpuptask_flag, str(cpp), cpubind_flag]) env_update = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', - 'OMP_NUM_THREADS': cpuptask} + 'OMP_NUM_THREADS': cpp} else: self.fwk.error("invalid task launch command.") raise RuntimeError("invalid task launch command.") diff --git a/setup.cfg b/setup.cfg index 24af6d0b..4416ba84 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [flake8] max-line-length = 160 -max-complexity = 39 +max-complexity = 41 exclude = ipsframework/configobj.py, ipsframework/six.py, diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index 387d5f9a..f9ad04ce 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -400,10 +400,10 @@ def test_init_task_srun(tmpdir): tm.task_launch_cmd = 'srun' rm.accurateNodes = True - def init_final_task(nproc, tppn): + def init_final_task(nproc, tppn, tcpt=0): task_id, cmd, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', nproc, 'exe', '/dir', tppn, True, - True, True)) + True, True, tcpt)) tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', task_id, None)) return task_id, cmd @@ -437,20 +437,56 @@ def init_final_task(nproc, tppn): with pytest.raises(ResourceRequestMismatchException): init_final_task(3, 1) + fwk.reset_mock() + task_id, cmd = init_final_task(1, 1, 2) + assert task_id == 9 + assert cmd == "srun -N 1 -n 1 -c 2 --cpu-bind=cores exe " + fwk.warning.assert_not_called() + + fwk.reset_mock() + task_id, cmd = init_final_task(1, 1, 1) + assert task_id == 10 + assert cmd == "srun -N 1 -n 1 -c 1 --cpu-bind=cores exe " + fwk.warning.assert_not_called() + + fwk.reset_mock() + task_id, cmd = init_final_task(1, 1, 4) + assert task_id == 11 + assert cmd == "srun -N 1 -n 1 -c 2 --cpu-bind=cores exe " + fwk.warning.assert_called_once_with("task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead") + + fwk.reset_mock() + task_id, cmd = init_final_task(2, 1, 2) + assert task_id == 12 + assert cmd == "srun -N 2 -n 2 -c 2 --cpu-bind=cores exe " + fwk.warning.assert_not_called() + + fwk.reset_mock() + task_id, cmd = init_final_task(2, 1, 1) + assert task_id == 13 + assert cmd == "srun -N 2 -n 2 -c 1 --cpu-bind=cores exe " + fwk.warning.assert_not_called() + + fwk.reset_mock() + task_id, cmd = init_final_task(2, 1, 12) + assert task_id == 14 + assert cmd == "srun -N 2 -n 2 -c 2 --cpu-bind=cores exe " + fwk.warning.assert_called_once_with("task cpp (12) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead") + # start two task, second should fail with Insufficient Resources depending on block task_id, cmd, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', 4, 'exe', '/dir', 0, True, - True, True)) + True, True, 0)) with pytest.raises(BlockedMessageException): tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', 1, 'exe', '/dir', 0, True, - True, True)) + True, True, 0)) with pytest.raises(InsufficientResourcesException): tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', 1, 'exe', '/dir', 0, False, - True, True)) + True, True, 0)) def test_init_task_pool_srun(tmpdir): From c404876ca339c1e5bd65d5eab20a20f815a452c7 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 19 Nov 2021 14:20:44 -0500 Subject: [PATCH 03/10] Move cpus_per_proc calculation from TaskManager to ResourceManager --- ipsframework/resourceManager.py | 18 +++++++- ipsframework/taskManager.py | 26 ++++------- tests/new/test_taskManager.py | 76 +++++++++++++++++++++++++++------ 3 files changed, 88 insertions(+), 32 deletions(-) diff --git a/ipsframework/resourceManager.py b/ipsframework/resourceManager.py index 27d47da3..2ac26f62 100644 --- a/ipsframework/resourceManager.py +++ b/ipsframework/resourceManager.py @@ -256,7 +256,7 @@ def add_nodes(self, listOfNodes): # RM getAllocation # pylint: disable=inconsistent-return-statements def get_allocation(self, comp_id, nproc, task_id, - whole_nodes, whole_socks, task_ppn=0): + whole_nodes, whole_socks, task_ppn=0, task_cpp=0): """ Traverse available nodes to return: @@ -316,6 +316,20 @@ def get_allocation(self, comp_id, nproc, task_id, allocation_possible = False if whole_nodes: allocation_possible, nodes = self.check_whole_node_cap(nproc, ppn) + + if allocation_possible: + num_cores = self.cores_per_node + max_cpp = num_cores//ppn + if task_cpp > 0: + if task_cpp > max_cpp: + self.fwk.warning(f"task cpp ({task_cpp}) exceeds maximum possible for {ppn} procs per node " + f"with {num_cores} cores per node, using {max_cpp} cpus per proc instead") + cpp = max_cpp + else: + cpp = task_cpp + else: + cpp = max_cpp + elif whole_socks: allocation_possible, nodes = self.check_whole_sock_cap(nproc, ppn) else: @@ -429,7 +443,7 @@ def get_allocation(self, comp_id, nproc, task_id, if whole_nodes: self.report_RM_status("allocation for task %d using whole nodes" % task_id) - return not whole_nodes, nodes, ppn, self.max_ppn, self.accurateNodes + return not whole_nodes, nodes, ppn, self.max_ppn, cpp, self.accurateNodes else: self.report_RM_status("allocation for task %d using partial nodes" % task_id) return not whole_nodes, nodes, node_file_entries, ppn, self.max_ppn, self.accurateNodes diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 1c3aecf9..cd4acbc8 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -228,7 +228,7 @@ def init_task(self, init_task_msg): cmd_args = init_task_msg.args[8:] try: - return self._init_task(caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks, cmd_args) + return self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args) except InsufficientResourcesException: if block: raise BlockedMessageException(init_task_msg, '***%s waiting for %d resources' % @@ -246,7 +246,7 @@ def init_task(self, init_task_msg): except Exception: raise - def _init_task(self, caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks, cmd_args): + def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args): # handle for task related things task_id = self.get_task_id() @@ -255,13 +255,14 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks task_id, wnodes, wsocks, - task_ppn=tppn) + task_ppn=tppn, + task_cpp=tcpp) self.fwk.debug('RM: get_allocation() returned %s', str(retval)) partial_node = retval[0] if partial_node: (nodelist, corelist, ppn, max_ppn, accurateNodes) = retval[1:] else: - (nodelist, ppn, max_ppn, accurateNodes) = retval[1:] + (nodelist, ppn, max_ppn, cpp, accurateNodes) = retval[1:] if partial_node: nodes = ','.join(nodelist) @@ -295,7 +296,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, max_ppn, nodes, accurateNodes, partial_nodes, - task_id, tcpp=0, core_list=''): + task_id, cpp=0, core_list=''): """ Construct task launch command to be executed by the component. @@ -512,17 +513,6 @@ def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, nproc_flag, str(nproc)]) else: cpuptask_flag = '-c' - num_cores = self.resource_mgr.cores_per_node - max_cpp = num_cores//ppn - if tcpp > 0: - if tcpp > max_cpp: - self.fwk.warning(f"task cpp ({tcpp}) exceeds maximum possible for {ppn} procs per node " - f"with {num_cores} cores per node, using {max_cpp} cpus per proc instead") - cpp = max_cpp - else: - cpp = tcpp - else: - cpp = max_cpp cpubind_flag = '--cpu-bind=cores' cmd = ' '.join([self.task_launch_cmd, nnodes_flag, str(num_nodes), @@ -558,10 +548,10 @@ def init_task_pool(self, init_task_msg): ret_dict = {} for task_name in task_dict: # handle for task related things - (nproc, working_dir, binary, cmd_args, tppn, wnodes, wsocks) = task_dict[task_name] + (nproc, working_dir, binary, cmd_args, tppn, wnodes, wsocks, tcpp) = task_dict[task_name] try: - ret_dict[task_name] = self._init_task(caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks, cmd_args) + ret_dict[task_name] = self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args) except InsufficientResourcesException: continue except BadResourceRequestException as e: diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index f9ad04ce..893e84e9 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -357,7 +357,8 @@ def test_build_launch_cmd_srun(): nodes='n1,n2', accurateNodes=None, partial_nodes=False, - task_id=None) + task_id=None, + cpp=1) assert cmd == ('srun -N 2 -n 4 -c 1 --cpu-bind=cores executable ', {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': 1}) @@ -371,7 +372,8 @@ def test_build_launch_cmd_srun(): nodes='n1,n2', accurateNodes=None, partial_nodes=False, - task_id=None) + task_id=None, + cpp=2) assert cmd == ('srun -N 2 -n 2 -c 2 --cpu-bind=cores executable 13 42', {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': 2}) @@ -512,9 +514,9 @@ def test_init_task_pool_srun(tmpdir): tm.task_launch_cmd = 'srun' rm.accurateNodes = True - def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, msg=None): + def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): if msg is None: - msg = {f'task{n}': (nproc, '/dir', f'exe{n}', (f'arg{n}',), tppn, True, False) for n in range(number_of_tasks)} + msg = {f'task{n}': (nproc, '/dir', f'exe{n}', (f'arg{n}',), tppn, True, False, tcpp) for n in range(number_of_tasks)} retval = tm.init_task_pool(ServiceRequestMessage('id', 'id', 'c', 'init_task_pool', msg)) for task_id, _, _ in retval.values(): tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', @@ -584,26 +586,76 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, msg=None): assert task_id == 13 assert cmd == 'srun -N 2 -n 4 -c 1 --cpu-bind=cores exe0 arg0' + # now try with task_cpp set + + fwk.reset_mock() + retval = init_final_task_pool(1, 1, 1, 2) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 15 + assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' + fwk.warning.assert_not_called() + + fwk.reset_mock() + retval = init_final_task_pool(1, 1, 1, 1) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 16 + assert cmd == 'srun -N 1 -n 1 -c 1 --cpu-bind=cores exe0 arg0' + fwk.warning.assert_not_called() + + fwk.reset_mock() + retval = init_final_task_pool(1, 1, 1, 4) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 17 + assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' + fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') + + fwk.reset_mock() + retval = init_final_task_pool(2, 1, 1, 2) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 18 + assert cmd == 'srun -N 2 -n 2 -c 2 --cpu-bind=cores exe0 arg0' + fwk.warning.assert_not_called() + + fwk.reset_mock() + retval = init_final_task_pool(2, 1, 1, 1) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 19 + assert cmd == 'srun -N 2 -n 2 -c 1 --cpu-bind=cores exe0 arg0' + fwk.warning.assert_not_called() + + fwk.reset_mock() + retval = init_final_task_pool(2, 1, 1, 4) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 20 + assert cmd == 'srun -N 2 -n 2 -c 2 --cpu-bind=cores exe0 arg0' + fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') + # different size tasks - msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False), - 'task1': (2, '/dir', 'exe1', ('arg1',), 0, True, False)} + msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0), + 'task1': (2, '/dir', 'exe1', ('arg1',), 0, True, False, 0)} retval = init_final_task_pool(msg=msg) assert len(retval) == 2 task_id, cmd, _ = retval['task0'] - assert task_id == 15 + assert task_id == 21 assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' task_id, cmd, _ = retval['task1'] - assert task_id == 16 + assert task_id == 22 assert cmd == 'srun -N 1 -n 2 -c 1 --cpu-bind=cores exe1 arg1' # one good task, one bad task - msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False), - 'task1': (5, '/dir', 'exe1', ('arg1',), 0, True, False)} + msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0), + 'task1': (5, '/dir', 'exe1', ('arg1',), 0, True, False, 0)} with pytest.raises(BadResourceRequestException): init_final_task_pool(msg=msg) # one good task, one bad task - msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False), - 'task1': (3, '/dir', 'exe1', ('arg1',), 1, True, False)} + msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0), + 'task1': (3, '/dir', 'exe1', ('arg1',), 1, True, False, 0)} with pytest.raises(ResourceRequestMismatchException): init_final_task_pool(msg=msg) From 1a43608739f68d405a8daa595c72757c96b3f084 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 19 Nov 2021 15:32:11 -0500 Subject: [PATCH 04/10] Fix OMP env_update, all must be strings --- ipsframework/taskManager.py | 2 +- tests/new/test_taskManager.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index cd4acbc8..1029f478 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -521,7 +521,7 @@ def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, cpubind_flag]) env_update = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', - 'OMP_NUM_THREADS': cpp} + 'OMP_NUM_THREADS': str(cpp)} else: self.fwk.error("invalid task launch command.") raise RuntimeError("invalid task launch command.") diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index 893e84e9..5006e4ae 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -361,7 +361,7 @@ def test_build_launch_cmd_srun(): cpp=1) assert cmd == ('srun -N 2 -n 4 -c 1 --cpu-bind=cores executable ', - {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': 1}) + {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '1'}) cmd = tm.build_launch_cmd(nproc=2, binary='executable', @@ -376,7 +376,7 @@ def test_build_launch_cmd_srun(): cpp=2) assert cmd == ('srun -N 2 -n 2 -c 2 --cpu-bind=cores executable 13 42', - {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': 2}) + {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}) def test_init_task_srun(tmpdir): From de555e1b1fb0ad2f70510c7664edd529b1dcb0d0 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Wed, 24 Nov 2021 14:44:46 -0500 Subject: [PATCH 05/10] Swap srun binding option --- ipsframework/taskManager.py | 2 +- tests/new/test_taskManager.py | 62 +++++++++++++++++------------------ 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 1029f478..242d8720 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -513,7 +513,7 @@ def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, nproc_flag, str(nproc)]) else: cpuptask_flag = '-c' - cpubind_flag = '--cpu-bind=cores' + cpubind_flag = '--hint=compute_bound' cmd = ' '.join([self.task_launch_cmd, nnodes_flag, str(num_nodes), nproc_flag, str(nproc), diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index 5006e4ae..1a935d17 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -360,7 +360,7 @@ def test_build_launch_cmd_srun(): task_id=None, cpp=1) - assert cmd == ('srun -N 2 -n 4 -c 1 --cpu-bind=cores executable ', + assert cmd == ('srun -N 2 -n 4 -c 1 --hint=compute_bound executable ', {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '1'}) cmd = tm.build_launch_cmd(nproc=2, @@ -375,7 +375,7 @@ def test_build_launch_cmd_srun(): task_id=None, cpp=2) - assert cmd == ('srun -N 2 -n 2 -c 2 --cpu-bind=cores executable 13 42', + assert cmd == ('srun -N 2 -n 2 -c 2 --hint=compute_bound executable 13 42', {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}) @@ -412,29 +412,29 @@ def init_final_task(nproc, tppn, tcpt=0): task_id, cmd = init_final_task(1, 0) assert task_id == 1 - assert cmd == "srun -N 1 -n 1 -c 2 --cpu-bind=cores exe " + assert cmd == "srun -N 1 -n 1 -c 2 --hint=compute_bound exe " task_id, cmd = init_final_task(2, 0) assert task_id == 2 - assert cmd == "srun -N 1 -n 2 -c 1 --cpu-bind=cores exe " + assert cmd == "srun -N 1 -n 2 -c 1 --hint=compute_bound exe " with pytest.raises(ResourceRequestUnequalPartitioningException): init_final_task(3, 0) task_id, cmd = init_final_task(4, 0) assert task_id == 4 - assert cmd == "srun -N 2 -n 4 -c 1 --cpu-bind=cores exe " + assert cmd == "srun -N 2 -n 4 -c 1 --hint=compute_bound exe " with pytest.raises(BadResourceRequestException): init_final_task(5, 0) task_id, cmd = init_final_task(1, 1) assert task_id == 6 - assert cmd == "srun -N 1 -n 1 -c 2 --cpu-bind=cores exe " + assert cmd == "srun -N 1 -n 1 -c 2 --hint=compute_bound exe " task_id, cmd = init_final_task(2, 1) assert task_id == 7 - assert cmd == "srun -N 2 -n 2 -c 2 --cpu-bind=cores exe " + assert cmd == "srun -N 2 -n 2 -c 2 --hint=compute_bound exe " with pytest.raises(ResourceRequestMismatchException): init_final_task(3, 1) @@ -442,37 +442,37 @@ def init_final_task(nproc, tppn, tcpt=0): fwk.reset_mock() task_id, cmd = init_final_task(1, 1, 2) assert task_id == 9 - assert cmd == "srun -N 1 -n 1 -c 2 --cpu-bind=cores exe " + assert cmd == "srun -N 1 -n 1 -c 2 --hint=compute_bound exe " fwk.warning.assert_not_called() fwk.reset_mock() task_id, cmd = init_final_task(1, 1, 1) assert task_id == 10 - assert cmd == "srun -N 1 -n 1 -c 1 --cpu-bind=cores exe " + assert cmd == "srun -N 1 -n 1 -c 1 --hint=compute_bound exe " fwk.warning.assert_not_called() fwk.reset_mock() task_id, cmd = init_final_task(1, 1, 4) assert task_id == 11 - assert cmd == "srun -N 1 -n 1 -c 2 --cpu-bind=cores exe " + assert cmd == "srun -N 1 -n 1 -c 2 --hint=compute_bound exe " fwk.warning.assert_called_once_with("task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead") fwk.reset_mock() task_id, cmd = init_final_task(2, 1, 2) assert task_id == 12 - assert cmd == "srun -N 2 -n 2 -c 2 --cpu-bind=cores exe " + assert cmd == "srun -N 2 -n 2 -c 2 --hint=compute_bound exe " fwk.warning.assert_not_called() fwk.reset_mock() task_id, cmd = init_final_task(2, 1, 1) assert task_id == 13 - assert cmd == "srun -N 2 -n 2 -c 1 --cpu-bind=cores exe " + assert cmd == "srun -N 2 -n 2 -c 1 --hint=compute_bound exe " fwk.warning.assert_not_called() fwk.reset_mock() task_id, cmd = init_final_task(2, 1, 12) assert task_id == 14 - assert cmd == "srun -N 2 -n 2 -c 2 --cpu-bind=cores exe " + assert cmd == "srun -N 2 -n 2 -c 2 --hint=compute_bound exe " fwk.warning.assert_called_once_with("task cpp (12) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead") # start two task, second should fail with Insufficient Resources depending on block @@ -527,13 +527,13 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 1 - assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' retval = init_final_task_pool(2, 0, 1) assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 2 - assert cmd == 'srun -N 1 -n 2 -c 1 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 1 -n 2 -c 1 --hint=compute_bound exe0 arg0' with pytest.raises(ResourceRequestUnequalPartitioningException): init_final_task_pool(3, 0, 1) @@ -542,7 +542,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 4 - assert cmd == 'srun -N 2 -n 4 -c 1 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 2 -n 4 -c 1 --hint=compute_bound exe0 arg0' with pytest.raises(BadResourceRequestException): init_final_task_pool(5, 0, 1) @@ -551,13 +551,13 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 6 - assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' retval = init_final_task_pool(2, 1, 1) assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 7 - assert cmd == 'srun -N 2 -n 2 -c 2 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 2 -n 2 -c 2 --hint=compute_bound exe0 arg0' with pytest.raises(ResourceRequestMismatchException): init_final_task_pool(3, 1, 1) @@ -566,25 +566,25 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 2 task_id, cmd, _ = retval['task0'] assert task_id == 9 - assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' task_id, cmd, _ = retval['task1'] assert task_id == 10 - assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe1 arg1' + assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe1 arg1' retval = init_final_task_pool(2, 0, 2) assert len(retval) == 2 task_id, cmd, _ = retval['task0'] assert task_id == 11 - assert cmd == 'srun -N 1 -n 2 -c 1 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 1 -n 2 -c 1 --hint=compute_bound exe0 arg0' task_id, cmd, _ = retval['task1'] assert task_id == 12 - assert cmd == 'srun -N 1 -n 2 -c 1 --cpu-bind=cores exe1 arg1' + assert cmd == 'srun -N 1 -n 2 -c 1 --hint=compute_bound exe1 arg1' retval = init_final_task_pool(4, 0, 2) assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 13 - assert cmd == 'srun -N 2 -n 4 -c 1 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 2 -n 4 -c 1 --hint=compute_bound exe0 arg0' # now try with task_cpp set @@ -593,7 +593,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 15 - assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' fwk.warning.assert_not_called() fwk.reset_mock() @@ -601,7 +601,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 16 - assert cmd == 'srun -N 1 -n 1 -c 1 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 1 --hint=compute_bound exe0 arg0' fwk.warning.assert_not_called() fwk.reset_mock() @@ -609,7 +609,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 17 - assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') fwk.reset_mock() @@ -617,7 +617,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 18 - assert cmd == 'srun -N 2 -n 2 -c 2 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 2 -n 2 -c 2 --hint=compute_bound exe0 arg0' fwk.warning.assert_not_called() fwk.reset_mock() @@ -625,7 +625,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 19 - assert cmd == 'srun -N 2 -n 2 -c 1 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 2 -n 2 -c 1 --hint=compute_bound exe0 arg0' fwk.warning.assert_not_called() fwk.reset_mock() @@ -633,7 +633,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 20 - assert cmd == 'srun -N 2 -n 2 -c 2 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 2 -n 2 -c 2 --hint=compute_bound exe0 arg0' fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') # different size tasks @@ -643,10 +643,10 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 2 task_id, cmd, _ = retval['task0'] assert task_id == 21 - assert cmd == 'srun -N 1 -n 1 -c 2 --cpu-bind=cores exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' task_id, cmd, _ = retval['task1'] assert task_id == 22 - assert cmd == 'srun -N 1 -n 2 -c 1 --cpu-bind=cores exe1 arg1' + assert cmd == 'srun -N 1 -n 2 -c 1 --hint=compute_bound exe1 arg1' # one good task, one bad task msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0), From 7883ecbce39cf2b5b2928937ef81799954a2f4ff Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 3 Dec 2021 10:14:36 -0500 Subject: [PATCH 06/10] Swap srun to force 1 thread-per-core --- ipsframework/taskManager.py | 2 +- tests/new/test_taskManager.py | 62 +++++++++++++++++------------------ 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 242d8720..4a2001c5 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -513,7 +513,7 @@ def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, nproc_flag, str(nproc)]) else: cpuptask_flag = '-c' - cpubind_flag = '--hint=compute_bound' + cpubind_flag = '--threads-per-core=1 --cpu-bind=cores' cmd = ' '.join([self.task_launch_cmd, nnodes_flag, str(num_nodes), nproc_flag, str(nproc), diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index 1a935d17..be9f0cfe 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -360,7 +360,7 @@ def test_build_launch_cmd_srun(): task_id=None, cpp=1) - assert cmd == ('srun -N 2 -n 4 -c 1 --hint=compute_bound executable ', + assert cmd == ('srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores executable ', {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '1'}) cmd = tm.build_launch_cmd(nproc=2, @@ -375,7 +375,7 @@ def test_build_launch_cmd_srun(): task_id=None, cpp=2) - assert cmd == ('srun -N 2 -n 2 -c 2 --hint=compute_bound executable 13 42', + assert cmd == ('srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores executable 13 42', {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}) @@ -412,29 +412,29 @@ def init_final_task(nproc, tppn, tcpt=0): task_id, cmd = init_final_task(1, 0) assert task_id == 1 - assert cmd == "srun -N 1 -n 1 -c 2 --hint=compute_bound exe " + assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " task_id, cmd = init_final_task(2, 0) assert task_id == 2 - assert cmd == "srun -N 1 -n 2 -c 1 --hint=compute_bound exe " + assert cmd == "srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe " with pytest.raises(ResourceRequestUnequalPartitioningException): init_final_task(3, 0) task_id, cmd = init_final_task(4, 0) assert task_id == 4 - assert cmd == "srun -N 2 -n 4 -c 1 --hint=compute_bound exe " + assert cmd == "srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores exe " with pytest.raises(BadResourceRequestException): init_final_task(5, 0) task_id, cmd = init_final_task(1, 1) assert task_id == 6 - assert cmd == "srun -N 1 -n 1 -c 2 --hint=compute_bound exe " + assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " task_id, cmd = init_final_task(2, 1) assert task_id == 7 - assert cmd == "srun -N 2 -n 2 -c 2 --hint=compute_bound exe " + assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe " with pytest.raises(ResourceRequestMismatchException): init_final_task(3, 1) @@ -442,37 +442,37 @@ def init_final_task(nproc, tppn, tcpt=0): fwk.reset_mock() task_id, cmd = init_final_task(1, 1, 2) assert task_id == 9 - assert cmd == "srun -N 1 -n 1 -c 2 --hint=compute_bound exe " + assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() task_id, cmd = init_final_task(1, 1, 1) assert task_id == 10 - assert cmd == "srun -N 1 -n 1 -c 1 --hint=compute_bound exe " + assert cmd == "srun -N 1 -n 1 -c 1 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() task_id, cmd = init_final_task(1, 1, 4) assert task_id == 11 - assert cmd == "srun -N 1 -n 1 -c 2 --hint=compute_bound exe " + assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_called_once_with("task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead") fwk.reset_mock() task_id, cmd = init_final_task(2, 1, 2) assert task_id == 12 - assert cmd == "srun -N 2 -n 2 -c 2 --hint=compute_bound exe " + assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() task_id, cmd = init_final_task(2, 1, 1) assert task_id == 13 - assert cmd == "srun -N 2 -n 2 -c 1 --hint=compute_bound exe " + assert cmd == "srun -N 2 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_not_called() fwk.reset_mock() task_id, cmd = init_final_task(2, 1, 12) assert task_id == 14 - assert cmd == "srun -N 2 -n 2 -c 2 --hint=compute_bound exe " + assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe " fwk.warning.assert_called_once_with("task cpp (12) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead") # start two task, second should fail with Insufficient Resources depending on block @@ -527,13 +527,13 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 1 - assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' retval = init_final_task_pool(2, 0, 1) assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 2 - assert cmd == 'srun -N 1 -n 2 -c 1 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' with pytest.raises(ResourceRequestUnequalPartitioningException): init_final_task_pool(3, 0, 1) @@ -542,7 +542,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 4 - assert cmd == 'srun -N 2 -n 4 -c 1 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' with pytest.raises(BadResourceRequestException): init_final_task_pool(5, 0, 1) @@ -551,13 +551,13 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 6 - assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' retval = init_final_task_pool(2, 1, 1) assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 7 - assert cmd == 'srun -N 2 -n 2 -c 2 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' with pytest.raises(ResourceRequestMismatchException): init_final_task_pool(3, 1, 1) @@ -566,25 +566,25 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 2 task_id, cmd, _ = retval['task0'] assert task_id == 9 - assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' task_id, cmd, _ = retval['task1'] assert task_id == 10 - assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe1 arg1' + assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe1 arg1' retval = init_final_task_pool(2, 0, 2) assert len(retval) == 2 task_id, cmd, _ = retval['task0'] assert task_id == 11 - assert cmd == 'srun -N 1 -n 2 -c 1 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' task_id, cmd, _ = retval['task1'] assert task_id == 12 - assert cmd == 'srun -N 1 -n 2 -c 1 --hint=compute_bound exe1 arg1' + assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe1 arg1' retval = init_final_task_pool(4, 0, 2) assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 13 - assert cmd == 'srun -N 2 -n 4 -c 1 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' # now try with task_cpp set @@ -593,7 +593,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 15 - assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() fwk.reset_mock() @@ -601,7 +601,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 16 - assert cmd == 'srun -N 1 -n 1 -c 1 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() fwk.reset_mock() @@ -609,7 +609,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 17 - assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') fwk.reset_mock() @@ -617,7 +617,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 18 - assert cmd == 'srun -N 2 -n 2 -c 2 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() fwk.reset_mock() @@ -625,7 +625,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 19 - assert cmd == 'srun -N 2 -n 2 -c 1 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 2 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_not_called() fwk.reset_mock() @@ -633,7 +633,7 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 1 task_id, cmd, _ = retval['task0'] assert task_id == 20 - assert cmd == 'srun -N 2 -n 2 -c 2 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') # different size tasks @@ -643,10 +643,10 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert len(retval) == 2 task_id, cmd, _ = retval['task0'] assert task_id == 21 - assert cmd == 'srun -N 1 -n 1 -c 2 --hint=compute_bound exe0 arg0' + assert cmd == 'srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores exe0 arg0' task_id, cmd, _ = retval['task1'] assert task_id == 22 - assert cmd == 'srun -N 1 -n 2 -c 1 --hint=compute_bound exe1 arg1' + assert cmd == 'srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores exe1 arg1' # one good task, one bad task msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0), From fd3a23bf9fe88f70d140a7ea9974c17eb985135d Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 3 Dec 2021 14:03:29 -0500 Subject: [PATCH 07/10] Add cori test for srun cpp --- tests/components/workers/cori_srun_openmp.py | 27 +++ tests/new/test_cori_srun.py | 164 +++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 tests/components/workers/cori_srun_openmp.py create mode 100644 tests/new/test_cori_srun.py diff --git a/tests/components/workers/cori_srun_openmp.py b/tests/components/workers/cori_srun_openmp.py new file mode 100644 index 00000000..1205bc60 --- /dev/null +++ b/tests/components/workers/cori_srun_openmp.py @@ -0,0 +1,27 @@ +from ipsframework import Component + + +class openmp_worker(Component): + # pylint: disable=no-member + def step(self, timestamp=0.0, **keywords): + cwd = self.services.get_working_dir() + + self.services.wait_task(self.services.launch_task(1, cwd, "check-mpi.gnu.cori", logfile="log.01", errfile="err.01")) + self.services.wait_task(self.services.launch_task(1, cwd, "check-mpi.gnu.cori", logfile="log.02", errfile="err.02", task_ppn=1)) + self.services.wait_task(self.services.launch_task(1, cwd, "check-mpi.gnu.cori", logfile="log.03", errfile="err.03", task_ppn=1, task_cpp=32)) + + self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.11", errfile="err.11")) + self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.12", errfile="err.12", task_ppn=4)) + self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.13", errfile="err.13", task_ppn=4, task_cpp=8)) + + self.services.wait_task(self.services.launch_task(32, cwd, "check-mpi.gnu.cori", logfile="log.21", errfile="err.21")) + self.services.wait_task(self.services.launch_task(32, cwd, "check-mpi.gnu.cori", logfile="log.22", errfile="err.22", task_ppn=32)) + self.services.wait_task(self.services.launch_task(32, cwd, "check-mpi.gnu.cori", logfile="log.23", errfile="err.23", task_ppn=32, task_cpp=1)) + + self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.31", errfile="err.31", task_ppn=8)) + self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.33", errfile="err.32", task_ppn=4, task_cpp=4)) + self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.32", errfile="err.33", task_ppn=4, task_cpp=2)) + + self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.41", errfile="err.41", task_ppn=8)) + self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.42", errfile="err.42", task_ppn=4, task_cpp=4)) + self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.43", errfile="err.43", task_ppn=4, task_cpp=2)) diff --git a/tests/new/test_cori_srun.py b/tests/new/test_cori_srun.py new file mode 100644 index 00000000..aaed68ec --- /dev/null +++ b/tests/new/test_cori_srun.py @@ -0,0 +1,164 @@ +import glob +import json +import pytest +from ipsframework import Framework + + +def write_basic_config_and_platform_files(tmpdir): + platform_file = tmpdir.join('cori.platform.conf') + + platform = """MPIRUN = srun +HOST = cori +NODE_DETECTION = slurm_env +CORES_PER_NODE = 32 +SOCKETS_PER_NODE = 2 +NODE_ALLOCATION_MODE = exclusive +""" + + with open(platform_file, 'w') as f: + f.write(platform) + + config_file = tmpdir.join('ips.config') + + config = f"""RUN_COMMENT = testing +SIM_NAME = test +LOG_FILE = {str(tmpdir)}/sim.log +LOG_LEVEL = INFO +SIM_ROOT = {str(tmpdir)} +SIMULATION_MODE = NORMAL +[PORTS] + NAMES = DRIVER + [[DRIVER]] + IMPLEMENTATION = DRIVER +[DRIVER] + CLASS = OPENMP + SUB_CLASS = + NAME = openmp_worker + BIN_PATH = + NPROC = 1 + INPUT_FILES = + OUTPUT_FILES = + SCRIPT = + MODULE = components.workers.cori_srun_openmp +""" + + with open(config_file, 'w') as f: + f.write(config) + + return platform_file, config_file + + +@pytest.mark.cori +def test_srun_openmp_on_cori(tmpdir): + + platform_file, config_file = write_basic_config_and_platform_files(tmpdir) + + framework = Framework(config_file_list=[str(config_file)], + log_file_name=str(tmpdir.join('ips.log')), + platform_file_name=str(platform_file), + debug=None, + verbose_debug=None, + cmd_nodes=0, + cmd_ppn=0) + + framework.run() + + # check simulation_log + json_files = glob.glob(str(tmpdir.join("simulation_log").join("*.json"))) + assert len(json_files) == 1 + with open(json_files[0], 'r') as json_file: + comments = [json.loads(line)['comment'].split(', ', maxsplit=4)[3:] for line in json_file.readlines()] + + # check that the process output log files are created + work_dir = tmpdir.join("work").join("OPENMP__openmp_worker_1") + + # 0 + for c in (2, 4, 6): + assert comments[c][0] == "Target = srun -N 1 -n 1 -c 32 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[c][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '32'}" + + for log in ('01', '02', '03'): + lines = sorted(work_dir.join(f"log.{log}").readlines()) + assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-63)\n') + + # 1 + for c in (8, 10, 12): + assert comments[c][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[c][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" + + for log in ('11', '12', '13'): + lines = sorted(work_dir.join(f"log.{log}").readlines()) + assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-7,32-39)\n') + assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16-23,48-55)\n') + assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 8-15,40-47)\n') + assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 24-31,56-63)\n') + + # 2 + for c in (14, 16, 18): + assert comments[c][0] == "Target = srun -N 1 -n 32 -c 1 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[c][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '1'}" + + for log in ('21', '22', '23'): + lines = sorted(work_dir.join(f"log.{log}").readlines(), key=lambda a: int(a.split()[3].replace(',', ''))) + for n, l in enumerate(lines): + cores = n//2 + n % 2*16 + assert lines[n].startswith(f'Hello from rank {n}') and lines[n].endswith(f'(core affinity = {cores},{cores+32})\n') + + # 31 + assert comments[20][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[20][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" + + lines = sorted(work_dir.join("log.31").readlines()) + assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-7,32-39)\n') + assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16-23,48-55)\n') + assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 8-15,40-47)\n') + assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 24-31,56-63)\n') + + # 32 + assert comments[22][0] == "Target = srun -N 1 -n 4 -c 4 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[22][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '4'}" + + lines = sorted(work_dir.join("log.32").readlines()) + assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0,1,32,33)\n') + assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16,17,48,49)\n') + assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 2,3,34,35)\n') + assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 18,19,50,51)\n') + + # 33 + assert comments[24][0] == "Target = srun -N 1 -n 4 -c 2 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[24][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}" + + lines = sorted(work_dir.join("log.33").readlines()) + assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-3,32-35)\n') + assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16-19,48-51)\n') + assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 4-7,36-39)\n') + assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 20-23,52-55)\n') + + # openmp + + # 41 + assert comments[26][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-hybrid.gnu.cori " + assert comments[26][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" + + lines = sorted(work_dir.join("log.41").readlines()) + for n, l in enumerate(lines): + assert l.startswith(f"Hello from rank {n//8}, thread {n%8}") + assert l.endswith(f"(core affinity = {n%8 + n//16*8 + n//8%2*16})\n") + + # 42 + assert comments[28][0] == "Target = srun -N 1 -n 4 -c 4 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-hybrid.gnu.cori " + assert comments[28][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '4'}" + + lines = sorted(work_dir.join("log.42").readlines()) + for n, l in enumerate(lines): + assert l.startswith(f"Hello from rank {n//4}, thread {n%4}") + assert l.endswith(f"(core affinity = {n%4 + n//8*4 + n//4%2*16})\n") + + # 43 + assert comments[30][0] == "Target = srun -N 1 -n 4 -c 2 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-hybrid.gnu.cori " + assert comments[30][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}" + + lines = sorted(work_dir.join("log.43").readlines()) + for n, l in enumerate(lines): + assert l.startswith(f"Hello from rank {n//2}, thread {n%2}") + assert l.endswith(f"(core affinity = {n%2 + n//4*2 + n//2%2*16})\n") From 44b99bc431c58eac10d774e75ea2336489bf7ef6 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 9 Dec 2021 13:49:16 -0500 Subject: [PATCH 08/10] Update docs --- doc/development.rst | 8 +++++--- ipsframework/services.py | 15 ++++++++++----- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/doc/development.rst b/doc/development.rst index a88c447c..6bc7bf35 100644 --- a/doc/development.rst +++ b/doc/development.rst @@ -144,7 +144,11 @@ The are some tests that only run on Cori at NERSC and these are not run as part of the :ref:`CI ` and must be run manually. To run those test you need to add the option ``--runcori`` to the ``pytest``. There are tests for the :ref:`shifter -functionally` that is Cori specific. +functionally` that is Cori specific. There are also test +for the srun command built with different ``task_ppn`` and +``task_cpp`` options in +:meth:`~ipsframework.services.ServicesProxy.launch_task`. + An example batch script for running the unit tests is: @@ -153,8 +157,6 @@ An example batch script for running the unit tests is: #!/bin/bash #SBATCH -p debug #SBATCH --nodes=1 - #SBATCH --tasks-per-node=1 - #SBATCH --cpus-per-task=32 #SBATCH -t 00:10:00 #SBATCH -C haswell #SBATCH -J pytest diff --git a/ipsframework/services.py b/ipsframework/services.py index 30b9ba91..ada3a2ed 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -554,6 +554,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): manage how the binary is launched. Keywords may be the following: * *task_ppn* : the processes per node value for this task + * *task_cpp* : the cores per process * *block* : specifies that this task will block (or raise an exception) if not enough resources are available to run immediately. If ``True``, the task will be retried until it @@ -637,7 +638,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): task_id = self._launch_task(nproc, working_dir, task_id, command, env_update, tag, keywords) if env_update: - self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}, env = {env_update}') + self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command} , env = {env_update}') else: self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}') @@ -718,7 +719,7 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0): task_ppn = task.keywords.get('task_ppn', self.ppn) wnodes = task.keywords.get('whole_nodes', not self.shared_nodes) wsocks = task.keywords.get('whole_sockets', not self.shared_nodes) - task_cpp = task.keywords.get('task_cpp', 0) + task_cpp = task.keywords.get('task_cpp', self.cpp) submit_dict[task_name] = (task.nproc, task.working_dir, task.binary, task.args, task_ppn, wnodes, wsocks, task_cpp) @@ -741,9 +742,13 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0): active_tasks[task_name] = self._launch_task(task.nproc, task.working_dir, task_id, command, env_update, tag, task.keywords) - self._send_monitor_event('IPS_LAUNCH_TASK_POOL', - 'task_id = %s , Tag = %s , nproc = %d , Target = %s , task_name = %s' % - (str(task_id), str(tag), int(task.nproc), command, task_name)) + if env_update: + self._send_monitor_event('IPS_LAUNCH_TASK_POOL', + f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}' + f' , env = {env_update}') + else: + self._send_monitor_event('IPS_LAUNCH_TASK_POOL', + f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}') return active_tasks From 0f616e967779c0381b4c15e3d0daf4dd28ae84ce Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 9 Dec 2021 13:24:46 -0800 Subject: [PATCH 09/10] Add tests for task pool --- ipsframework/services.py | 4 +- tests/components/workers/cori_srun_openmp.py | 20 ++++- tests/new/test_cori_srun.py | 82 +++++++++++++++++--- 3 files changed, 89 insertions(+), 17 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index ada3a2ed..3dc0a24c 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -638,7 +638,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): task_id = self._launch_task(nproc, working_dir, task_id, command, env_update, tag, keywords) if env_update: - self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command} , env = {env_update}') + self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}, env = {env_update}') else: self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}') @@ -745,7 +745,7 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0): if env_update: self._send_monitor_event('IPS_LAUNCH_TASK_POOL', f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}' - f' , env = {env_update}') + f', env = {env_update}') else: self._send_monitor_event('IPS_LAUNCH_TASK_POOL', f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}') diff --git a/tests/components/workers/cori_srun_openmp.py b/tests/components/workers/cori_srun_openmp.py index 1205bc60..e949b8ae 100644 --- a/tests/components/workers/cori_srun_openmp.py +++ b/tests/components/workers/cori_srun_openmp.py @@ -1,7 +1,7 @@ from ipsframework import Component -class openmp_worker(Component): +class openmp_task(Component): # pylint: disable=no-member def step(self, timestamp=0.0, **keywords): cwd = self.services.get_working_dir() @@ -19,9 +19,23 @@ def step(self, timestamp=0.0, **keywords): self.services.wait_task(self.services.launch_task(32, cwd, "check-mpi.gnu.cori", logfile="log.23", errfile="err.23", task_ppn=32, task_cpp=1)) self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.31", errfile="err.31", task_ppn=8)) - self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.33", errfile="err.32", task_ppn=4, task_cpp=4)) - self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.32", errfile="err.33", task_ppn=4, task_cpp=2)) + self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.32", errfile="err.32", task_ppn=4, task_cpp=4)) + self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.33", errfile="err.33", task_ppn=4, task_cpp=2)) self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.41", errfile="err.41", task_ppn=8)) self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.42", errfile="err.42", task_ppn=4, task_cpp=4)) self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.43", errfile="err.43", task_ppn=4, task_cpp=2)) + + +class openmp_task_pool(Component): + # pylint: disable=no-member + def step(self, timestamp=0.0, **keywords): + cwd = self.services.get_working_dir() + + self.services.create_task_pool('pool') + + self.services.add_task('pool', 'task_1', 4, cwd, "check-mpi.gnu.cori", logfile="log.1", errfile="err.1", task_ppn=8) + self.services.add_task('pool', 'task_2', 4, cwd, "check-mpi.gnu.cori", logfile="log.2", errfile="err.2", task_ppn=4, task_cpp=4) + self.services.add_task('pool', 'task_3', 4, cwd, "check-mpi.gnu.cori", logfile="log.3", errfile="err.3", task_ppn=4, task_cpp=2) + + self.services.submit_tasks('pool') diff --git a/tests/new/test_cori_srun.py b/tests/new/test_cori_srun.py index aaed68ec..41b6c730 100644 --- a/tests/new/test_cori_srun.py +++ b/tests/new/test_cori_srun.py @@ -4,7 +4,7 @@ from ipsframework import Framework -def write_basic_config_and_platform_files(tmpdir): +def write_basic_config_and_platform_files(tmpdir, name): platform_file = tmpdir.join('cori.platform.conf') platform = """MPIRUN = srun @@ -33,7 +33,7 @@ def write_basic_config_and_platform_files(tmpdir): [DRIVER] CLASS = OPENMP SUB_CLASS = - NAME = openmp_worker + NAME = {name} BIN_PATH = NPROC = 1 INPUT_FILES = @@ -51,7 +51,7 @@ def write_basic_config_and_platform_files(tmpdir): @pytest.mark.cori def test_srun_openmp_on_cori(tmpdir): - platform_file, config_file = write_basic_config_and_platform_files(tmpdir) + platform_file, config_file = write_basic_config_and_platform_files(tmpdir, "openmp_task") framework = Framework(config_file_list=[str(config_file)], log_file_name=str(tmpdir.join('ips.log')), @@ -70,7 +70,7 @@ def test_srun_openmp_on_cori(tmpdir): comments = [json.loads(line)['comment'].split(', ', maxsplit=4)[3:] for line in json_file.readlines()] # check that the process output log files are created - work_dir = tmpdir.join("work").join("OPENMP__openmp_worker_1") + work_dir = tmpdir.join("work").join("OPENMP__openmp_task_1") # 0 for c in (2, 4, 6): @@ -119,20 +119,20 @@ def test_srun_openmp_on_cori(tmpdir): assert comments[22][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '4'}" lines = sorted(work_dir.join("log.32").readlines()) - assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0,1,32,33)\n') - assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16,17,48,49)\n') - assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 2,3,34,35)\n') - assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 18,19,50,51)\n') + assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-3,32-35)\n') + assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16-19,48-51)\n') + assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 4-7,36-39)\n') + assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 20-23,52-55)\n') # 33 assert comments[24][0] == "Target = srun -N 1 -n 4 -c 2 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " assert comments[24][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}" lines = sorted(work_dir.join("log.33").readlines()) - assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-3,32-35)\n') - assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16-19,48-51)\n') - assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 4-7,36-39)\n') - assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 20-23,52-55)\n') + assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0,1,32,33)\n') + assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16,17,48,49)\n') + assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 2,3,34,35)\n') + assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 18,19,50,51)\n') # openmp @@ -162,3 +162,61 @@ def test_srun_openmp_on_cori(tmpdir): for n, l in enumerate(lines): assert l.startswith(f"Hello from rank {n//2}, thread {n%2}") assert l.endswith(f"(core affinity = {n%2 + n//4*2 + n//2%2*16})\n") + + +@pytest.mark.cori +def test_srun_openmp_on_cori_pool(tmpdir): + + platform_file, config_file = write_basic_config_and_platform_files(tmpdir, "openmp_task_pool") + + framework = Framework(config_file_list=[str(config_file)], + log_file_name=str(tmpdir.join('ips.log')), + platform_file_name=str(platform_file), + debug=None, + verbose_debug=None, + cmd_nodes=0, + cmd_ppn=0) + + framework.run() + + # check simulation_log + json_files = glob.glob(str(tmpdir.join("simulation_log").join("*.json"))) + assert len(json_files) == 1 + with open(json_files[0], 'r') as json_file: + comments = [json.loads(line)['comment'].split(', ', maxsplit=5)[3:] for line in json_file.readlines()] + + # check that the process output log files are created + work_dir = tmpdir.join("work").join("OPENMP__openmp_task_pool_1") + + # 1 + assert comments[3][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[3][1] == "task_name = task_1" + assert comments[3][2] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" + + lines = sorted(work_dir.join("log.1").readlines()) + assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-7,32-39)\n') + assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16-23,48-55)\n') + assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 8-15,40-47)\n') + assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 24-31,56-63)\n') + + # 2 + assert comments[5][0] == "Target = srun -N 1 -n 4 -c 4 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[5][1] == "task_name = task_2" + assert comments[5][2] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '4'}" + + lines = sorted(work_dir.join("log.2").readlines()) + assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-3,32-35)\n') + assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16-19,48-51)\n') + assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 4-7,36-39)\n') + assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 20-23,52-55)\n') + + # 3 + assert comments[7][0] == "Target = srun -N 1 -n 4 -c 2 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[7][1] == "task_name = task_3" + assert comments[7][2] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}" + + lines = sorted(work_dir.join("log.3").readlines()) + assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0,1,32,33)\n') + assert lines[1].startswith('Hello from rank 1') and lines[1].endswith('(core affinity = 16,17,48,49)\n') + assert lines[2].startswith('Hello from rank 2') and lines[2].endswith('(core affinity = 2,3,34,35)\n') + assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 18,19,50,51)\n') From b686c77418458dc035d688411695ecc21b10bd78 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 9 Dec 2021 17:24:37 -0500 Subject: [PATCH 10/10] Add more docs --- doc/development.rst | 7 +- doc/user_guides/advanced_guide.rst | 123 ++++++++++++++++++++++++++++- ipsframework/services.py | 2 +- 3 files changed, 127 insertions(+), 5 deletions(-) diff --git a/doc/development.rst b/doc/development.rst index 6bc7bf35..7b02658f 100644 --- a/doc/development.rst +++ b/doc/development.rst @@ -144,8 +144,8 @@ The are some tests that only run on Cori at NERSC and these are not run as part of the :ref:`CI ` and must be run manually. To run those test you need to add the option ``--runcori`` to the ``pytest``. There are tests for the :ref:`shifter -functionally` that is Cori specific. There are also test -for the srun command built with different ``task_ppn`` and +functionally` that is Cori specific. There are also +tests for the srun commands built with different ``task_ppn`` and ``task_cpp`` options in :meth:`~ipsframework.services.ServicesProxy.launch_task`. @@ -166,7 +166,8 @@ An example batch script for running the unit tests is: module load python/3.8-anaconda-2020.11 python -m pytest --runcori -The check the output in ``pytest.out`` to see that all the tests passed. +Then check the output in ``pytest.out`` to see that all the tests +passed. Writing Tests ~~~~~~~~~~~~~ diff --git a/doc/user_guides/advanced_guide.rst b/doc/user_guides/advanced_guide.rst index 00dfb506..2772aa14 100644 --- a/doc/user_guides/advanced_guide.rst +++ b/doc/user_guides/advanced_guide.rst @@ -291,7 +291,128 @@ Component invocation in the IPS means one component is calling another component Task Launch ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -The task launch interface allows components to launch and manage the execution of (parallel) executables. Similar to the component invocation interface, the behavior of *launch_task* and the *wait_task* variants are controlled using the *block* keyword argument and different interfaces to *wait_task*. +The task launch interface allows components to launch and manage the +execution of (parallel) executables. Similar to the component +invocation interface, the behavior of +:py:meth:`~ipsframework.services.ServicesProxy.launch_task` and the +:py:meth:`~ipsframework.services.ServicesProxy.wait_task` variants are +controlled using the *block* keyword argument and different interfaces +to *wait_task*. + +The ``task_ppn`` and ``task_cpp`` options all greater control over how +commands are made. ``task_ppn`` will limit the number of task per +node, ``task_ccp`` will limit the number of cores assigned to each +process, this is only used when ``MPIRUN=srun``, if ``task_cpp`` is +not set it will be calculated automatically. + +~~~~~~~~~~~~~~ +Slurm examples +~~~~~~~~~~~~~~ + +The following examples show the behavior if you are running on a `Cori +`_ with 32 cores per node. + +Using the `check-mpi.gnu.cori +`_ +binary provided on Cori with ``nproc=8`` without specifying other +options: + +.. code-block:: python + + self.services.launch_task(8, cwd, "check-mpi.gnu.cori") + +the ``srun`` command created will be ``srun -N 1 -n 8 -c +4 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` along +with settings the environment variables for OpenMP +``OMP_PLACES=threads OMP_PROC_BIND=spread OMP_NUM_THREADS=4``. The +resulting core affinity is + +.. code-block:: text + + Hello from rank 0, on nid00025. (core affinity = 0-3) + Hello from rank 1, on nid00025. (core affinity = 16-19) + Hello from rank 2, on nid00025. (core affinity = 4-7) + Hello from rank 3, on nid00025. (core affinity = 20-23) + Hello from rank 4, on nid00025. (core affinity = 8-11) + Hello from rank 5, on nid00025. (core affinity = 24-27) + Hello from rank 6, on nid00025. (core affinity = 12-15) + Hello from rank 7, on nid00025. (core affinity = 28-31) + +If you also include the option ``task_ppn=4``: + +.. code-block:: python + + self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4) + +then the command created will be ``srun -N 2 -n 8 -c +8 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` along +with settings the environment variables for OpenMP +``OMP_PLACES=threads OMP_PROC_BIND=spread OMP_NUM_THREADS=8``. The +resulting core affinity is + +.. code-block:: text + + Hello from rank 0, on nid00025. (core affinity = 0-7) + Hello from rank 1, on nid00025. (core affinity = 16-23) + Hello from rank 2, on nid00025. (core affinity = 8-15) + Hello from rank 3, on nid00025. (core affinity = 24-31) + Hello from rank 4, on nid00026. (core affinity = 0-7) + Hello from rank 5, on nid00026. (core affinity = 16-23) + Hello from rank 6, on nid00026. (core affinity = 8-15) + Hello from rank 7, on nid00026. (core affinity = 24-31) + +You can limit the ``--cpus-per-task`` of the ``srun`` command by +setting ``task_cpp``, adding ``task_cpp=2`` + +.. code-block:: python + + self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4, task_cpp=2) + +will create the command ``srun -N 2 -n 8 -c +2 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` and set +``OMP_PLACES=threads OMP_PROC_BIND=spread OMP_NUM_THREADS=2``. This +will result in under-utilizing the nodes, which may be needed if your +task is memory bound. The resulting core affinity is + +.. code-block:: text + + Hello from rank 0, on nid00025. (core affinity = 0,1) + Hello from rank 1, on nid00025. (core affinity = 16,17) + Hello from rank 2, on nid00025. (core affinity = 2,3) + Hello from rank 3, on nid00025. (core affinity = 18,19) + Hello from rank 4, on nid00026. (core affinity = 0,1) + Hello from rank 5, on nid00026. (core affinity = 16,17) + Hello from rank 6, on nid00026. (core affinity = 2,3) + Hello from rank 7, on nid00026. (core affinity = 18,19) + +Using the `check-hybrid.gnu.cori +`_ +binary with the same options: + +.. code-block:: python + + self.services.launch_task(8, cwd, "check-hybrid.gnu.cori", task_ppn=4, task_cpp=2) + +the resulting core affinity of the OpenMP threads are: + +.. code-block:: text + + Hello from rank 0, thread 0, on nid00025. (core affinity = 0) + Hello from rank 0, thread 1, on nid00025. (core affinity = 1) + Hello from rank 1, thread 0, on nid00025. (core affinity = 16) + Hello from rank 1, thread 1, on nid00025. (core affinity = 17) + Hello from rank 2, thread 0, on nid00025. (core affinity = 2) + Hello from rank 2, thread 1, on nid00025. (core affinity = 3) + Hello from rank 3, thread 0, on nid00025. (core affinity = 18) + Hello from rank 3, thread 1, on nid00025. (core affinity = 19) + Hello from rank 4, thread 0, on nid00026. (core affinity = 0) + Hello from rank 4, thread 1, on nid00026. (core affinity = 1) + Hello from rank 5, thread 0, on nid00026. (core affinity = 16) + Hello from rank 5, thread 1, on nid00026. (core affinity = 17) + Hello from rank 6, thread 0, on nid00026. (core affinity = 2) + Hello from rank 6, thread 1, on nid00026. (core affinity = 3) + Hello from rank 7, thread 0, on nid00026. (core affinity = 18) + Hello from rank 7, thread 1, on nid00026. (core affinity = 19) .. automethod:: ipsframework.services.ServicesProxy.launch_task :noindex: diff --git a/ipsframework/services.py b/ipsframework/services.py index 3dc0a24c..4a6f1c4b 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -554,7 +554,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): manage how the binary is launched. Keywords may be the following: * *task_ppn* : the processes per node value for this task - * *task_cpp* : the cores per process + * *task_cpp* : the cores per process, only used when ``MPIRUN=srun`` commands * *block* : specifies that this task will block (or raise an exception) if not enough resources are available to run immediately. If ``True``, the task will be retried until it