8 import os, socket, logging
11 from .mpi_impl_base
import MPIMixed,CMDFGen
13 module_logger=logging.getLogger(
'lsf_cray_intel')
20 """!Calls sync and aprun -B sync. This is meant to be run before
21 aprunning another program."""
23 super(Syncer,self).__init__()
24 def __call__(self,runner=None,logger=None):
25 if logger
is None: logger=module_logger
26 logger.info(
"Running sync...")
28 logger.info(
'Running aprun -B sync...')
32 """!Runs the "sync" command as an exe()."""
37 """!Adds OpenMP support to the provided object
39 @param arg An produtil.prog.Runner or
40 produtil.mpiprog.MPIRanksBase object tree
41 @param threads the number of threads, or threads per rank, an
46 ont=os.environ.get(
'OMP_NUM_THREADS',
'')
50 except (KeyError,TypeError,ValueError)
as e:
54 nodesize=os.environ.get(
'PRODUTIL_RUN_NODESIZE',
'24')
55 nodesize=int(nodesize)
56 threads=max(1,nodesize-1)
60 module_logger.info(
'Threaded with %s threads so add -cc depth.'%(
62 for a
in reversed([
'-cc',
'depth',
'-d',str(threads)]):
65 arg=arg.env(KMP_AFFINITY=
'disabled',OMP_NUM_THREADS=threads)
70 def aprun_ln_sf(source,target,content,logger=None):
72 aprun_path,
"-N",
"1",
"-n",
"1",
"-j",
"1",
"/bin/ln",
"-sf",
73 str(source),str(target)], logger=logger)
74 content=os.readlink(target)
75 if content==source:
return True
77 "FILESYSTEM FAILURE: Python and aprun ln -sf both cannot symlink \"%s\" -> \"%s\". Instead, the symlink is to \"%s\"."%(
78 target,source,content),target)
81 """!Determines if Cray aprun should be used to run MPI programs by
82 looking for the aprun program in $PATH"""
83 detected=aprun_path
is not None
85 produtil.fileop.ln_sf = aprun_ln_sf
89 """!Does this module represent an MPI implementation? Returns True."""
93 """!Returns an ImmutableRunner that will run the specified program.
94 @returns an empty list
95 @note This function does NOT search $PATH. That ensures the $PATH
96 will be expanded on the compute node instead. Use
97 produtil.fileop.find_exe() if you want to explicitly search the
99 @param exe The executable to run on compute nodes.
100 @param kwargs Ignored."""
102 inside_aprun=int(os.environ.get(
'INSIDE_APRUN',
'0'))
105 .env(INSIDE_APRUN=str(inside_aprun))
109 def mpirunner(arg,allranks=False,logger=None,**kwargs):
112 m=
mpirunner2(arg,allranks=allranks,logger=logger,**kwargs)
113 logger.info(
'mpirunner: %s => %s'%(repr(arg),repr(m)))
117 """!Turns a produtil.mpiprog.MPIRanksBase tree into a produtil.prog.Runner
118 @param arg a tree of produtil.mpiprog.MPIRanksBase objects
119 @param allranks if True, and only one rank is requested by arg, then
120 all MPI ranks will be used
121 @param logger a logging.Logger for log messages
122 @param kwargs passed to produtil.mpi_impl.mpi_impl_base.CMDFGen
123 when mpiserial is in use.
124 @returns a produtil.prog.Runner that will run the selected MPI program"""
126 logger=logging.getLogger(
'lsf_cray_intel')
128 (serial,parallel)=arg.check_serial()
129 if serial
and parallel:
131 'Cannot mix serial and parallel MPI ranks in the same '
133 nodesize=os.environ.get(
'PRODUTIL_RUN_NODESIZE',
'24')
134 nodesize=int(nodesize)
135 hyperthreads=os.environ.get(
'PRODUTIL_RUN_HYPERTHREADS',
'1')
136 hyperthreads=int(hyperthreads)
137 maxtasks=os.environ[
'TOTAL_TASKS']
138 maxtasks=int(maxtasks)
143 logger.info(
'Decide what to do with -cc option.')
144 if threads
is not None and threads>1:
145 logger.info(
'Threaded with %s threads so add -cc depth.'%(
147 runner[
'-cc',
'depth',
'-d',str(threads)]
149 logger.info(
'No threads (threads=%s).'%(repr(threads),))
153 inside_aprun=int(os.environ.get(
'INSIDE_APRUN',
'0'))
155 runner=runner.env(INSIDE_APRUN=str(inside_aprun))
157 if arg.nranks()==1
and allranks:
158 for rank,count
in arg.expand_iter(expand=
False):
159 pernode=min(maxtasks,nodesize)
160 runner=runner[
'-j',
'%d'%(hyperthreads),
162 '-n',
'%d'%(maxtasks)][rank.args()]
166 raise MPIAllRanksError(
167 "When using allranks=True, you must provide an mpi program "
168 "specification with only one MPI rank (to be duplicated across "
171 lines=[a
for a
in arg.to_arglist(to_shell=
True,expand=
True)]
173 raise MPISerialMissing(
174 'Attempting to run a serial program via aprun mpiserial but '
175 'the mpiserial program is not in your $PATH.')
176 pernode=min(arg.nranks(),nodesize)
177 return runner[
'-n%s'%(arg.nranks()),
178 '-j',
'1',
'-N',
'%d'%pernode,
'mpiserial']\
179 .prerun(CMDFGen(
'serialcmdf',lines,**kwargs)) \
185 for rank,count
in arg.expand_iter(expand=
False):
186 maxtasks=max(count,maxtasks)
188 for rank,count
in arg.expand_iter(expand=
False):
189 pernode=min(count,maxtasks,nodesize)
190 if not first: runner=runner[
':']
191 runner=runner[
'-j',
'%d'%(hyperthreads),
This module provides a set of utility functions to do filesystem operations.
def runsync
Runs the "sync" command as an exe().
def detect()
Determines if Cray aprun should be used to run MPI programs by looking for the aprun program in $PATH...
def make_bigexe(exe, kwargs)
Returns an ImmutableRunner that will run the specified program.
Implements the produtil.run: provides the object tree for representing shell commands.
Calls sync and aprun -B sync.
def can_run_mpi()
Does this module represent an MPI implementation? Returns True.
Raised when os.symlink makes a symlink to a target other than the one that was requested.
def mpirunner2(arg, allranks=False, logger=None, kwargs)
Turns a produtil.mpiprog.MPIRanksBase tree into a produtil.prog.Runner.
Object structure for describing MPI programs.
def openmp(arg, threads)
Adds OpenMP support to the provided object.
This is the abstract superclass of all classes that represent one or more MPI ranks, including MPI ranks that are actually serial programs.
Represents a single stage of a pipeline to execute.
def find_exe
Searches the $PATH or a specified iterable of directory names to find an executable file with the giv...
An copy-on-write version of Runner.