diff --git a/doc/user_guides/advanced_guide.rst b/doc/user_guides/advanced_guide.rst index bc42e7c0..7316bb9a 100644 --- a/doc/user_guides/advanced_guide.rst +++ b/doc/user_guides/advanced_guide.rst @@ -325,12 +325,13 @@ The following examples show the behavior if you are running on a `Cori Using the `check-mpi.gnu.cori `_ -binary provided on Cori with ``nproc=8`` without specifying other -options: +binary provided on Cori with ``nproc=8`` and settings the correct OMP +environment variables with ``omp=True`` without specifying other +options : .. code-block:: python - self.services.launch_task(8, cwd, "check-mpi.gnu.cori") + self.services.launch_task(8, cwd, "check-mpi.gnu.cori", omp=True) the ``srun`` command created will be ``srun -N 1 -n 8 -c 4 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` along @@ -353,7 +354,7 @@ If you also include the option ``task_ppn=4``: .. code-block:: python - self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4) + self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4, omp=True) then the command created will be ``srun -N 2 -n 8 -c 8 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` along @@ -377,7 +378,7 @@ setting ``task_cpp``, adding ``task_cpp=2`` .. code-block:: python - self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4, task_cpp=2) + self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4, task_cpp=2, omp=True) will create the command ``srun -N 2 -n 8 -c 2 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` and set @@ -402,7 +403,7 @@ binary with the same options: .. code-block:: python - self.services.launch_task(8, cwd, "check-hybrid.gnu.cori", task_ppn=4, task_cpp=2) + self.services.launch_task(8, cwd, "check-hybrid.gnu.cori", task_ppn=4, task_cpp=2, omp=True) the resulting core affinity of the OpenMP threads are: diff --git a/ipsframework/services.py b/ipsframework/services.py index dc26e062..2827a3d4 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -591,6 +591,8 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): * *task_ppn* : the processes per node value for this task * *task_cpp* : the cores per process, only used when ``MPIRUN=srun`` commands + * *omp* : If ``True`` the task will be launch with the correct OpenMP environment + variables set, only used when ``MPIRUN=srun`` * *block* : specifies that this task will block (or raise an exception) if not enough resources are available to run immediately. If ``True``, the task will be retried until it @@ -655,6 +657,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): task_ppn = keywords.get('task_ppn', self.ppn) task_cpp = keywords.get('task_cpp', self.cpp) + omp = keywords.get('omp', False) block = keywords.get('block', True) tag = keywords.get('tag', 'None') @@ -666,7 +669,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): msg_id = self._invoke_service(self.fwk.component_id, 'init_task', nproc, binary_fullpath, working_dir, task_ppn, block, - whole_nodes, whole_socks, task_cpp, *args) + whole_nodes, whole_socks, task_cpp, omp, *args) (task_id, command, env_update, cores_allocated) = self._get_service_response(msg_id, block=True) except Exception: raise @@ -758,9 +761,10 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0): wnodes = task.keywords.get('whole_nodes', not self.shared_nodes) wsocks = task.keywords.get('whole_sockets', not self.shared_nodes) task_cpp = task.keywords.get('task_cpp', self.cpp) + omp = task.keywords.get('omp', False) submit_dict[task_name] = (task.nproc, task.working_dir, task.binary, task.args, - task_ppn, wnodes, wsocks, task_cpp) + task_ppn, wnodes, wsocks, task_cpp, omp) try: msg_id = self._invoke_service(self.fwk.component_id, diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 2ef07713..2c052a02 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -223,12 +223,13 @@ def init_task(self, init_task_msg): wnodes = init_task_msg.args[5] wsocks = init_task_msg.args[6] tcpp = init_task_msg.args[7] + omp = init_task_msg.args[8] # SIMYAN: increased arguments - cmd_args = init_task_msg.args[8:] + cmd_args = init_task_msg.args[9:] try: - return self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args) + return self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, omp, wnodes, wsocks, cmd_args) except InsufficientResourcesException: if block: raise BlockedMessageException(init_task_msg, '***%s waiting for %d resources' % @@ -246,7 +247,7 @@ def init_task(self, init_task_msg): except Exception: raise - def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args): + def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, omp, wnodes, wsocks, cmd_args): # handle for task related things task_id = self.get_task_id() @@ -282,7 +283,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, max_ppn, nodes, accurateNodes, False, task_id, - cpp) + cpp, omp) self.curr_task_table[task_id] = {'component': caller_id, 'status': 'init_task', @@ -296,7 +297,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, max_ppn, nodes, accurateNodes, partial_nodes, - task_id, cpp=0, core_list=''): + task_id, cpp=0, omp=False, core_list=''): """ Construct task launch command to be executed by the component. @@ -519,9 +520,10 @@ def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, nproc_flag, str(nproc), cpuptask_flag, str(cpp), cpubind_flag]) - env_update = {'OMP_PLACES': 'threads', - 'OMP_PROC_BIND': 'spread', - 'OMP_NUM_THREADS': str(cpp)} + if omp: + env_update = {'OMP_PLACES': 'threads', + 'OMP_PROC_BIND': 'spread', + 'OMP_NUM_THREADS': str(cpp)} else: self.fwk.error("invalid task launch command.") raise RuntimeError("invalid task launch command.") @@ -548,10 +550,10 @@ def init_task_pool(self, init_task_msg): ret_dict = {} for task_name in task_dict: # handle for task related things - (nproc, working_dir, binary, cmd_args, tppn, wnodes, wsocks, tcpp) = task_dict[task_name] + (nproc, working_dir, binary, cmd_args, tppn, wnodes, wsocks, tcpp, omp) = task_dict[task_name] try: - ret_dict[task_name] = self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args) + ret_dict[task_name] = self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, omp, wnodes, wsocks, cmd_args) except InsufficientResourcesException: continue except BadResourceRequestException as e: diff --git a/tests/components/workers/cori_srun_openmp.py b/tests/components/workers/cori_srun_openmp.py index e949b8ae..c372434a 100644 --- a/tests/components/workers/cori_srun_openmp.py +++ b/tests/components/workers/cori_srun_openmp.py @@ -6,25 +6,28 @@ class openmp_task(Component): def step(self, timestamp=0.0, **keywords): cwd = self.services.get_working_dir() - self.services.wait_task(self.services.launch_task(1, cwd, "check-mpi.gnu.cori", logfile="log.01", errfile="err.01")) - self.services.wait_task(self.services.launch_task(1, cwd, "check-mpi.gnu.cori", logfile="log.02", errfile="err.02", task_ppn=1)) - self.services.wait_task(self.services.launch_task(1, cwd, "check-mpi.gnu.cori", logfile="log.03", errfile="err.03", task_ppn=1, task_cpp=32)) + mpi = "/usr/common/software/bin/check-mpi.gnu.cori" + hybrid = "/usr/common/software/bin/check-hybrid.gnu.cori" - self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.11", errfile="err.11")) - self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.12", errfile="err.12", task_ppn=4)) - self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.13", errfile="err.13", task_ppn=4, task_cpp=8)) + self.services.wait_task(self.services.launch_task(1, cwd, mpi, logfile="log.01", errfile="err.01", omp=True)) + self.services.wait_task(self.services.launch_task(1, cwd, mpi, logfile="log.02", errfile="err.02", task_ppn=1, omp=True)) + self.services.wait_task(self.services.launch_task(1, cwd, mpi, logfile="log.03", errfile="err.03", task_ppn=1, task_cpp=32, omp=True)) - self.services.wait_task(self.services.launch_task(32, cwd, "check-mpi.gnu.cori", logfile="log.21", errfile="err.21")) - self.services.wait_task(self.services.launch_task(32, cwd, "check-mpi.gnu.cori", logfile="log.22", errfile="err.22", task_ppn=32)) - self.services.wait_task(self.services.launch_task(32, cwd, "check-mpi.gnu.cori", logfile="log.23", errfile="err.23", task_ppn=32, task_cpp=1)) + self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.11", errfile="err.11", omp=True)) + self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.12", errfile="err.12", task_ppn=4, omp=True)) + self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.13", errfile="err.13", task_ppn=4, task_cpp=8, omp=True)) - self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.31", errfile="err.31", task_ppn=8)) - self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.32", errfile="err.32", task_ppn=4, task_cpp=4)) - self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.33", errfile="err.33", task_ppn=4, task_cpp=2)) + self.services.wait_task(self.services.launch_task(32, cwd, mpi, logfile="log.21", errfile="err.21", omp=True)) + self.services.wait_task(self.services.launch_task(32, cwd, mpi, logfile="log.22", errfile="err.22", task_ppn=32, omp=True)) + self.services.wait_task(self.services.launch_task(32, cwd, mpi, logfile="log.23", errfile="err.23", task_ppn=32, task_cpp=1, omp=True)) - self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.41", errfile="err.41", task_ppn=8)) - self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.42", errfile="err.42", task_ppn=4, task_cpp=4)) - self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.43", errfile="err.43", task_ppn=4, task_cpp=2)) + self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.31", errfile="err.31", task_ppn=8, omp=True)) + self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.32", errfile="err.32", task_ppn=4, task_cpp=4, omp=True)) + self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.33", errfile="err.33", task_ppn=4, task_cpp=2, omp=True)) + + self.services.wait_task(self.services.launch_task(4, cwd, hybrid, logfile="log.41", errfile="err.41", task_ppn=8, omp=True)) + self.services.wait_task(self.services.launch_task(4, cwd, hybrid, logfile="log.42", errfile="err.42", task_ppn=4, task_cpp=4, omp=True)) + self.services.wait_task(self.services.launch_task(4, cwd, hybrid, logfile="log.43", errfile="err.43", task_ppn=4, task_cpp=2, omp=True)) class openmp_task_pool(Component): @@ -34,8 +37,9 @@ def step(self, timestamp=0.0, **keywords): self.services.create_task_pool('pool') - self.services.add_task('pool', 'task_1', 4, cwd, "check-mpi.gnu.cori", logfile="log.1", errfile="err.1", task_ppn=8) - self.services.add_task('pool', 'task_2', 4, cwd, "check-mpi.gnu.cori", logfile="log.2", errfile="err.2", task_ppn=4, task_cpp=4) - self.services.add_task('pool', 'task_3', 4, cwd, "check-mpi.gnu.cori", logfile="log.3", errfile="err.3", task_ppn=4, task_cpp=2) + mpi = "/usr/common/software/bin/check-mpi.gnu.cori" + self.services.add_task('pool', 'task_1', 4, cwd, mpi, logfile="log.1", errfile="err.1", task_ppn=8, omp=True) + self.services.add_task('pool', 'task_2', 4, cwd, mpi, logfile="log.2", errfile="err.2", task_ppn=4, task_cpp=4, omp=True) + self.services.add_task('pool', 'task_3', 4, cwd, mpi, logfile="log.3", errfile="err.3", task_ppn=4, task_cpp=2, omp=True) self.services.submit_tasks('pool') diff --git a/tests/new/test_cori_srun.py b/tests/new/test_cori_srun.py index 7543fdab..c5a8820c 100644 --- a/tests/new/test_cori_srun.py +++ b/tests/new/test_cori_srun.py @@ -73,7 +73,7 @@ def test_srun_openmp_on_cori(tmpdir): work_dir = tmpdir.join("work").join("OPENMP__openmp_task_1") # 0 - for c in (2, 4, 6): + for c in (5, 7, 9): assert comments[c][0] == "Target = srun -N 1 -n 1 -c 32 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " assert comments[c][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '32'}" @@ -82,7 +82,7 @@ def test_srun_openmp_on_cori(tmpdir): assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-63)\n') # 1 - for c in (8, 10, 12): + for c in (11, 13, 15): assert comments[c][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " assert comments[c][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" @@ -94,7 +94,7 @@ def test_srun_openmp_on_cori(tmpdir): assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 24-31,56-63)\n') # 2 - for c in (14, 16, 18): + for c in (17, 19, 21): assert comments[c][0] == "Target = srun -N 1 -n 32 -c 1 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " assert comments[c][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '1'}" @@ -105,8 +105,8 @@ def test_srun_openmp_on_cori(tmpdir): assert l.startswith(f'Hello from rank {n}') and l.endswith(f'(core affinity = {cores},{cores+32})\n') # 31 - assert comments[20][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " - assert comments[20][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" + assert comments[23][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[23][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" lines = sorted(work_dir.join("log.31").readlines()) assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-7,32-39)\n') @@ -115,8 +115,8 @@ def test_srun_openmp_on_cori(tmpdir): assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 24-31,56-63)\n') # 32 - assert comments[22][0] == "Target = srun -N 1 -n 4 -c 4 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " - assert comments[22][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '4'}" + assert comments[25][0] == "Target = srun -N 1 -n 4 -c 4 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[25][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '4'}" lines = sorted(work_dir.join("log.32").readlines()) assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-3,32-35)\n') @@ -125,8 +125,8 @@ def test_srun_openmp_on_cori(tmpdir): assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 20-23,52-55)\n') # 33 - assert comments[24][0] == "Target = srun -N 1 -n 4 -c 2 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " - assert comments[24][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}" + assert comments[27][0] == "Target = srun -N 1 -n 4 -c 2 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[27][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}" lines = sorted(work_dir.join("log.33").readlines()) assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0,1,32,33)\n') @@ -137,8 +137,8 @@ def test_srun_openmp_on_cori(tmpdir): # openmp # 41 - assert comments[26][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-hybrid.gnu.cori " - assert comments[26][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" + assert comments[29][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-hybrid.gnu.cori " + assert comments[29][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" lines = sorted(work_dir.join("log.41").readlines()) for n, l in enumerate(lines): @@ -146,8 +146,8 @@ def test_srun_openmp_on_cori(tmpdir): assert l.endswith(f"(core affinity = {n%8 + n//16*8 + n//8%2*16})\n") # 42 - assert comments[28][0] == "Target = srun -N 1 -n 4 -c 4 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-hybrid.gnu.cori " - assert comments[28][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '4'}" + assert comments[31][0] == "Target = srun -N 1 -n 4 -c 4 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-hybrid.gnu.cori " + assert comments[31][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '4'}" lines = sorted(work_dir.join("log.42").readlines()) for n, l in enumerate(lines): @@ -155,8 +155,8 @@ def test_srun_openmp_on_cori(tmpdir): assert l.endswith(f"(core affinity = {n%4 + n//8*4 + n//4%2*16})\n") # 43 - assert comments[30][0] == "Target = srun -N 1 -n 4 -c 2 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-hybrid.gnu.cori " - assert comments[30][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}" + assert comments[33][0] == "Target = srun -N 1 -n 4 -c 2 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-hybrid.gnu.cori " + assert comments[33][1] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}" lines = sorted(work_dir.join("log.43").readlines()) for n, l in enumerate(lines): @@ -189,9 +189,9 @@ def test_srun_openmp_on_cori_pool(tmpdir): work_dir = tmpdir.join("work").join("OPENMP__openmp_task_pool_1") # 1 - assert comments[3][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " - assert comments[3][1] == "task_name = task_1" - assert comments[3][2] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" + assert comments[6][0] == "Target = srun -N 1 -n 4 -c 8 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[6][1] == "task_name = task_1" + assert comments[6][2] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '8'}" lines = sorted(work_dir.join("log.1").readlines()) assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-7,32-39)\n') @@ -200,9 +200,9 @@ def test_srun_openmp_on_cori_pool(tmpdir): assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 24-31,56-63)\n') # 2 - assert comments[5][0] == "Target = srun -N 1 -n 4 -c 4 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " - assert comments[5][1] == "task_name = task_2" - assert comments[5][2] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '4'}" + assert comments[8][0] == "Target = srun -N 1 -n 4 -c 4 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[8][1] == "task_name = task_2" + assert comments[8][2] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '4'}" lines = sorted(work_dir.join("log.2").readlines()) assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0-3,32-35)\n') @@ -211,9 +211,9 @@ def test_srun_openmp_on_cori_pool(tmpdir): assert lines[3].startswith('Hello from rank 3') and lines[3].endswith('(core affinity = 20-23,52-55)\n') # 3 - assert comments[7][0] == "Target = srun -N 1 -n 4 -c 2 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " - assert comments[7][1] == "task_name = task_3" - assert comments[7][2] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}" + assert comments[10][0] == "Target = srun -N 1 -n 4 -c 2 --threads-per-core=1 --cpu-bind=cores /usr/common/software/bin/check-mpi.gnu.cori " + assert comments[10][1] == "task_name = task_3" + assert comments[10][2] == "env = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}" lines = sorted(work_dir.join("log.3").readlines()) assert lines[0].startswith('Hello from rank 0') and lines[0].endswith('(core affinity = 0,1,32,33)\n') diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index c73c8d41..462416c9 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -358,7 +358,38 @@ def test_build_launch_cmd_srun(): accurateNodes=None, partial_nodes=False, task_id=None, - cpp=1) + cpp=1, + omp=False) + + assert cmd == ('srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores executable ', None) + + cmd = tm.build_launch_cmd(nproc=2, + binary='executable', + cmd_args=('13', '42'), + working_dir=None, + ppn=1, + max_ppn=None, + nodes='n1,n2', + accurateNodes=None, + partial_nodes=False, + task_id=None, + cpp=2, + omp=False) + + assert cmd == ('srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores executable 13 42', None) + + cmd = tm.build_launch_cmd(nproc=4, + binary='executable', + cmd_args=(), + working_dir=None, + ppn=2, + max_ppn=None, + nodes='n1,n2', + accurateNodes=None, + partial_nodes=False, + task_id=None, + cpp=1, + omp=True) assert cmd == ('srun -N 2 -n 4 -c 1 --threads-per-core=1 --cpu-bind=cores executable ', {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '1'}) @@ -373,7 +404,8 @@ def test_build_launch_cmd_srun(): accurateNodes=None, partial_nodes=False, task_id=None, - cpp=2) + cpp=2, + omp=True) assert cmd == ('srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores executable 13 42', {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}) @@ -405,7 +437,7 @@ def test_init_task_srun(tmpdir): def init_final_task(nproc, tppn, tcpt=0): task_id, cmd, _, cores_allocated = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', nproc, 'exe', '/dir', tppn, True, - True, True, tcpt)) + True, True, tcpt, False)) tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', task_id, None)) return task_id, cmd, cores_allocated @@ -483,17 +515,17 @@ def init_final_task(nproc, tppn, tcpt=0): # start two task, second should fail with Insufficient Resources depending on block task_id, cmd, _, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', 4, 'exe', '/dir', 0, True, - True, True, 0)) + True, True, 0, False)) with pytest.raises(BlockedMessageException): tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', 1, 'exe', '/dir', 0, True, - True, True, 0)) + True, True, 0, False)) with pytest.raises(InsufficientResourcesException): tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', 1, 'exe', '/dir', 0, False, - True, True, 0)) + True, True, 0, False)) def test_init_task_pool_srun(tmpdir): @@ -521,7 +553,7 @@ def test_init_task_pool_srun(tmpdir): def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): if msg is None: - msg = {f'task{n}': (nproc, '/dir', f'exe{n}', (f'arg{n}',), tppn, True, False, tcpp) for n in range(number_of_tasks)} + msg = {f'task{n}': (nproc, '/dir', f'exe{n}', (f'arg{n}',), tppn, True, False, tcpp, False) for n in range(number_of_tasks)} retval = tm.init_task_pool(ServiceRequestMessage('id', 'id', 'c', 'init_task_pool', msg)) for task_id, _, _, _ in retval.values(): tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', @@ -650,8 +682,8 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') # different size tasks - msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0), - 'task1': (2, '/dir', 'exe1', ('arg1',), 0, True, False, 0)} + msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0, False), + 'task1': (2, '/dir', 'exe1', ('arg1',), 0, True, False, 0, False)} retval = init_final_task_pool(msg=msg) assert len(retval) == 2 task_id, cmd, _, cores = retval['task0'] @@ -664,13 +696,13 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert cores == 2 # one good task, one bad task - msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0), - 'task1': (5, '/dir', 'exe1', ('arg1',), 0, True, False, 0)} + msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0, False), + 'task1': (5, '/dir', 'exe1', ('arg1',), 0, True, False, 0, False)} with pytest.raises(BadResourceRequestException): init_final_task_pool(msg=msg) # one good task, one bad task - msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0), - 'task1': (3, '/dir', 'exe1', ('arg1',), 1, True, False, 0)} + msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False, 0, False), + 'task1': (3, '/dir', 'exe1', ('arg1',), 1, True, False, 0, False)} with pytest.raises(ResourceRequestMismatchException): init_final_task_pool(msg=msg)