Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions doc/user_guides/advanced_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,13 @@ The following examples show the behavior if you are running on a `Cori

Using the `check-mpi.gnu.cori
<https://docs.nersc.gov/jobs/affinity/#use-nersc-prebuilt-binaries>`_
binary provided on Cori with ``nproc=8`` without specifying other
options:
binary provided on Cori with ``nproc=8`` and settings the correct OMP
environment variables with ``omp=True`` without specifying other
options :

.. code-block:: python

self.services.launch_task(8, cwd, "check-mpi.gnu.cori")
self.services.launch_task(8, cwd, "check-mpi.gnu.cori", omp=True)

the ``srun`` command created will be ``srun -N 1 -n 8 -c
4 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` along
Expand All @@ -353,7 +354,7 @@ If you also include the option ``task_ppn=4``:

.. code-block:: python

self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4)
self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4, omp=True)

then the command created will be ``srun -N 2 -n 8 -c
8 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` along
Expand All @@ -377,7 +378,7 @@ setting ``task_cpp``, adding ``task_cpp=2``

.. code-block:: python

self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4, task_cpp=2)
self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4, task_cpp=2, omp=True)

will create the command ``srun -N 2 -n 8 -c
2 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` and set
Expand All @@ -402,7 +403,7 @@ binary with the same options:

.. code-block:: python

self.services.launch_task(8, cwd, "check-hybrid.gnu.cori", task_ppn=4, task_cpp=2)
self.services.launch_task(8, cwd, "check-hybrid.gnu.cori", task_ppn=4, task_cpp=2, omp=True)

the resulting core affinity of the OpenMP threads are:

Expand Down
8 changes: 6 additions & 2 deletions ipsframework/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,8 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords):

* *task_ppn* : the processes per node value for this task
* *task_cpp* : the cores per process, only used when ``MPIRUN=srun`` commands
* *omp* : If ``True`` the task will be launch with the correct OpenMP environment
variables set, only used when ``MPIRUN=srun``
* *block* : specifies that this task will block (or raise an
exception) if not enough resources are available to run
immediately. If ``True``, the task will be retried until it
Expand Down Expand Up @@ -655,6 +657,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords):

task_ppn = keywords.get('task_ppn', self.ppn)
task_cpp = keywords.get('task_cpp', self.cpp)
omp = keywords.get('omp', False)
block = keywords.get('block', True)
tag = keywords.get('tag', 'None')

Expand All @@ -666,7 +669,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords):
msg_id = self._invoke_service(self.fwk.component_id,
'init_task', nproc, binary_fullpath,
working_dir, task_ppn, block,
whole_nodes, whole_socks, task_cpp, *args)
whole_nodes, whole_socks, task_cpp, omp, *args)
(task_id, command, env_update, cores_allocated) = self._get_service_response(msg_id, block=True)
except Exception:
raise
Expand Down Expand Up @@ -758,9 +761,10 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0):
wnodes = task.keywords.get('whole_nodes', not self.shared_nodes)
wsocks = task.keywords.get('whole_sockets', not self.shared_nodes)
task_cpp = task.keywords.get('task_cpp', self.cpp)
omp = task.keywords.get('omp', False)
submit_dict[task_name] = (task.nproc, task.working_dir,
task.binary, task.args,
task_ppn, wnodes, wsocks, task_cpp)
task_ppn, wnodes, wsocks, task_cpp, omp)

try:
msg_id = self._invoke_service(self.fwk.component_id,
Expand Down
22 changes: 12 additions & 10 deletions ipsframework/taskManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,13 @@ def init_task(self, init_task_msg):
wnodes = init_task_msg.args[5]
wsocks = init_task_msg.args[6]
tcpp = init_task_msg.args[7]
omp = init_task_msg.args[8]

# SIMYAN: increased arguments
cmd_args = init_task_msg.args[8:]
cmd_args = init_task_msg.args[9:]

try:
return self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args)
return self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, omp, wnodes, wsocks, cmd_args)
except InsufficientResourcesException:
if block:
raise BlockedMessageException(init_task_msg, '***%s waiting for %d resources' %
Expand All @@ -246,7 +247,7 @@ def init_task(self, init_task_msg):
except Exception:
raise

def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args):
def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, omp, wnodes, wsocks, cmd_args):
# handle for task related things
task_id = self.get_task_id()

Expand Down Expand Up @@ -282,7 +283,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes,
max_ppn, nodes,
accurateNodes,
False, task_id,
cpp)
cpp, omp)

self.curr_task_table[task_id] = {'component': caller_id,
'status': 'init_task',
Expand All @@ -296,7 +297,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes,

def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn,
max_ppn, nodes, accurateNodes, partial_nodes,
task_id, cpp=0, core_list=''):
task_id, cpp=0, omp=False, core_list=''):
"""
Construct task launch command to be executed by the component.

Expand Down Expand Up @@ -519,9 +520,10 @@ def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn,
nproc_flag, str(nproc),
cpuptask_flag, str(cpp),
cpubind_flag])
env_update = {'OMP_PLACES': 'threads',
'OMP_PROC_BIND': 'spread',
'OMP_NUM_THREADS': str(cpp)}
if omp:
env_update = {'OMP_PLACES': 'threads',
'OMP_PROC_BIND': 'spread',
'OMP_NUM_THREADS': str(cpp)}
else:
self.fwk.error("invalid task launch command.")
raise RuntimeError("invalid task launch command.")
Expand All @@ -548,10 +550,10 @@ def init_task_pool(self, init_task_msg):
ret_dict = {}
for task_name in task_dict:
# handle for task related things
(nproc, working_dir, binary, cmd_args, tppn, wnodes, wsocks, tcpp) = task_dict[task_name]
(nproc, working_dir, binary, cmd_args, tppn, wnodes, wsocks, tcpp, omp) = task_dict[task_name]

try:
ret_dict[task_name] = self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args)
ret_dict[task_name] = self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, omp, wnodes, wsocks, cmd_args)
except InsufficientResourcesException:
continue
except BadResourceRequestException as e:
Expand Down
40 changes: 22 additions & 18 deletions tests/components/workers/cori_srun_openmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,28 @@ class openmp_task(Component):
def step(self, timestamp=0.0, **keywords):
cwd = self.services.get_working_dir()

self.services.wait_task(self.services.launch_task(1, cwd, "check-mpi.gnu.cori", logfile="log.01", errfile="err.01"))
self.services.wait_task(self.services.launch_task(1, cwd, "check-mpi.gnu.cori", logfile="log.02", errfile="err.02", task_ppn=1))
self.services.wait_task(self.services.launch_task(1, cwd, "check-mpi.gnu.cori", logfile="log.03", errfile="err.03", task_ppn=1, task_cpp=32))
mpi = "/usr/common/software/bin/check-mpi.gnu.cori"
hybrid = "/usr/common/software/bin/check-hybrid.gnu.cori"

self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.11", errfile="err.11"))
self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.12", errfile="err.12", task_ppn=4))
self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.13", errfile="err.13", task_ppn=4, task_cpp=8))
self.services.wait_task(self.services.launch_task(1, cwd, mpi, logfile="log.01", errfile="err.01", omp=True))
self.services.wait_task(self.services.launch_task(1, cwd, mpi, logfile="log.02", errfile="err.02", task_ppn=1, omp=True))
self.services.wait_task(self.services.launch_task(1, cwd, mpi, logfile="log.03", errfile="err.03", task_ppn=1, task_cpp=32, omp=True))

self.services.wait_task(self.services.launch_task(32, cwd, "check-mpi.gnu.cori", logfile="log.21", errfile="err.21"))
self.services.wait_task(self.services.launch_task(32, cwd, "check-mpi.gnu.cori", logfile="log.22", errfile="err.22", task_ppn=32))
self.services.wait_task(self.services.launch_task(32, cwd, "check-mpi.gnu.cori", logfile="log.23", errfile="err.23", task_ppn=32, task_cpp=1))
self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.11", errfile="err.11", omp=True))
self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.12", errfile="err.12", task_ppn=4, omp=True))
self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.13", errfile="err.13", task_ppn=4, task_cpp=8, omp=True))

self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.31", errfile="err.31", task_ppn=8))
self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.32", errfile="err.32", task_ppn=4, task_cpp=4))
self.services.wait_task(self.services.launch_task(4, cwd, "check-mpi.gnu.cori", logfile="log.33", errfile="err.33", task_ppn=4, task_cpp=2))
self.services.wait_task(self.services.launch_task(32, cwd, mpi, logfile="log.21", errfile="err.21", omp=True))
self.services.wait_task(self.services.launch_task(32, cwd, mpi, logfile="log.22", errfile="err.22", task_ppn=32, omp=True))
self.services.wait_task(self.services.launch_task(32, cwd, mpi, logfile="log.23", errfile="err.23", task_ppn=32, task_cpp=1, omp=True))

self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.41", errfile="err.41", task_ppn=8))
self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.42", errfile="err.42", task_ppn=4, task_cpp=4))
self.services.wait_task(self.services.launch_task(4, cwd, "check-hybrid.gnu.cori", logfile="log.43", errfile="err.43", task_ppn=4, task_cpp=2))
self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.31", errfile="err.31", task_ppn=8, omp=True))
self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.32", errfile="err.32", task_ppn=4, task_cpp=4, omp=True))
self.services.wait_task(self.services.launch_task(4, cwd, mpi, logfile="log.33", errfile="err.33", task_ppn=4, task_cpp=2, omp=True))

self.services.wait_task(self.services.launch_task(4, cwd, hybrid, logfile="log.41", errfile="err.41", task_ppn=8, omp=True))
self.services.wait_task(self.services.launch_task(4, cwd, hybrid, logfile="log.42", errfile="err.42", task_ppn=4, task_cpp=4, omp=True))
self.services.wait_task(self.services.launch_task(4, cwd, hybrid, logfile="log.43", errfile="err.43", task_ppn=4, task_cpp=2, omp=True))


class openmp_task_pool(Component):
Expand All @@ -34,8 +37,9 @@ def step(self, timestamp=0.0, **keywords):

self.services.create_task_pool('pool')

self.services.add_task('pool', 'task_1', 4, cwd, "check-mpi.gnu.cori", logfile="log.1", errfile="err.1", task_ppn=8)
self.services.add_task('pool', 'task_2', 4, cwd, "check-mpi.gnu.cori", logfile="log.2", errfile="err.2", task_ppn=4, task_cpp=4)
self.services.add_task('pool', 'task_3', 4, cwd, "check-mpi.gnu.cori", logfile="log.3", errfile="err.3", task_ppn=4, task_cpp=2)
mpi = "/usr/common/software/bin/check-mpi.gnu.cori"
self.services.add_task('pool', 'task_1', 4, cwd, mpi, logfile="log.1", errfile="err.1", task_ppn=8, omp=True)
self.services.add_task('pool', 'task_2', 4, cwd, mpi, logfile="log.2", errfile="err.2", task_ppn=4, task_cpp=4, omp=True)
self.services.add_task('pool', 'task_3', 4, cwd, mpi, logfile="log.3", errfile="err.3", task_ppn=4, task_cpp=2, omp=True)

self.services.submit_tasks('pool')
Loading