1 """!A shell-like syntax for running serial, MPI and OpenMP programs. 
    3 This module implements a shell-like syntax for launching MPI and 
    4 non-MPI programs from Python.  It recognizes three types of 
    5 executables: mpi, "small serial" (safe for running on a batch node) 
    6 and "big serial" (which should be run via aprun if applicable).  There 
    7 is no difference between "small serial" and "big serial" programs 
    8 except on certain architectures (like Cray) where the job script runs 
    9 on a heavily-loaded batch node and has compute nodes assigned for 
   10 running other programs. 
   12 @section progtype Program Types 
   14 There are three types of programs: mpi, serial and "big non-MPI."  A 
   15 "big" executable is one that is either OpenMP, or is a serial program 
   16 that cannot safely be run on heavily loaded batch nodes.  On Cray 
   17 architecture machines, the job script runs on a heavily-populated 
   18 "batch" node, with some compute nodes assigned for "large" programs. 
   19 In such environments, the "big" executables are run on compute nodes 
   20 and the small ones on the batch node.   
   22 * mpi('exename') = an executable "exename" that calls MPI_Init and 
   23     MPI_Finalize exactly once each, in that order. 
   24 * exe('exename') = a small non-MPI program safe to run on a batch node 
   25 * bigexe('exename') = a big non-MPI program that must be run on a 
   26     compute node it may or may not use other forms of parallelism 
   28 You can also make reusable aliases to avoid having to call those 
   29 functions over and over (more on that later).  Examples: 
   31 * Python:   wrf=mpi('./wrf.exe') 
   32 * Python:   lsl=alias(exe('/bin/ls')['-l'].env(LANG='C',LS_COLORS='never')) 
   34 Those can then be reused later on as if the code is pasted in, similar 
   37 @section serexs Serial Execution Syntax 
   39 Select your serial programs by exe('name') for small serial programs 
   40 and bigexe('name') for big serial programs.  The return value of those 
   41 functions can then be used with a shell-like syntax to specify 
   42 redirection and piping.  Example: 
   44 *  shell version: ls -l / | wc -l 
   45 *  Python version: run(exe('ls')['-l','/'] | exe('wc')['-l']) 
   47 Redirection syntax similar to the shell (< > and << operators): 
   49   run( ( exe('myprogram')['arg1','arg2','...'] < 'infile' ) > 'outfile') 
   52 Note the extra set of parentheses: you cannot do "exe('prog') < infile 
   53 > outfile" because of the order of precedence of Python operators 
   57   run(exe('myprogram')['arg1','arg2','...'] >> 'appendfile') 
   60 You can also send strings as input with << 
   62   run(exe('myprogram')['arg1','arg2','...'] << 'some input string') 
   65 One difference from shells is that < and << always modify the 
   66 beginning of the pipeline: 
   68 * shell: cat < infile | wc -l 
   69 * Python #1: ( exe('cat') < 'infile' ) | exe('wc')['-l'] 
   70 * Python #2: exe('cat') | ( exe('wc')['-l'] < 'infile' ) 
   72 Note that the last second one, equivalent to `cat|wc -l<infile`, would 
   73 NOT work in a shell since you would be giving wc -l two inputs.   
   75 @section parexs Parallel Execution Syntax 
   77 Use mpi('exename') to select your executable, use [] to set arguments, 
   78 use multiplication to set the number of ranks and use addition to 
   79 combine different executables together into a multiple program 
   80 multiple data (MPMD) MPI program. 
   82 Run ten copies of ls -l: 
   84   run(mpirun(mpiserial(('ls')['-l'])*10)) 
   87 Run HyCOM coupled HWRF: one wm3c.exe, 30 hycom.exe and 204 wrf.exe: 
   89   run(mpirun(mpi('wm3c.exe') + mpi('hycom.exe')*30 + mpi('wrf.exe')*204)) 
   92 You can set environment variables, pipe MPI output and handle 
   93 redirection using the mpirun() function, which converts MPI programs 
   94 into an bigexe()-style object (Runner): 
   98     result=$( mpirun -n 30 hostname | sort -u | wc -l ) 
  103     result=runstr( mpirun(mpi('hostname')*30) | exe['sort']['-u'] | exe['wc']['-l'] ) 
  106 @section aliases Aliases 
  108 If you find yourself frequently needing the same command, or you need 
  109 to store a command for multiple uses, then then you should define an 
  110 alias.  Let's say you want "long output" format Japanese language "ls" 
  114   exe('ls')['-l','/path/to/dir'].env(LANG='JP') 
  117 but you find yourself running that on many different directories. 
  118 Then you may want to make an alias: 
  121   jplsl=alias(exe('ls')['-l'].env(LANG='JP')) 
  124 The return value jplsl can be treated as an exe()-like return value 
  125 since it was from exe() originally, but any new arguments will be 
  126 appended to the original set: 
  129   run(jplsl['/path/to/dir']) 
  132 Note that if we did this: 
  134   badlsl=exe('ls')['-l'].env(LANG='JP')  # Bad! No alias! 
  135   run(badlsl['/'])  # will list / 
  136   run(badlsl['/home'])  # will list / and /home 
  137   run(badlsl['/usr/bin']) # will list / /home and /usr/bin 
  139   goodlsl=alias(exe('ls')['-l'].env(LANG='JP') 
  140   run(goodlsl['/'])  # will list / 
  141   run(goodlsl['/home'])  # will list /home 
  142   run(goodlsl['/usr/bin']) # will list /usr/bin 
  145 Then the run(badlsl['/home']) would list /home AND / which is NOT what 
  146 we want.  Why does it do that?  It is because badlsl is not an alias 
  147 --- it is a regular output from exe(), so every time we call its [] 
  148 operator, we add an argument to the original command.  When we call 
  149 alias() it returns a copy-on-write version (goodlsl), where every call 
  150 to [] creates a new object. 
  152 Note that alias() also works with pipelines, but most operations will 
  153 only modify the last the command in the pipeline (or the first, for 
  154 operations that change stdin). 
  166 __all__=[
'alias',
'exe',
'run',
'runstr',
'mpi',
'mpiserial',
'mpirun',
 
  167          'runbg',
'prog',
'mpiprog',
'mpiimpl',
'waitprocs',
'runsync',
 
  168          'InvalidRunArgument',
'ExitStatusException',
'checkrun',
 
  173 module_logger=logging.getLogger(
'produtil.run')
 
  176     """!Raised to indicate that an invalid argument was sent into one 
  177     of the run module functions.""" 
  180     """!Raised to indicate that a program generated an invalid return 
  183     Examine the "returncode" member variable for the returncode value. 
  184     Negative values indicate the program was terminated by a signal 
  185     while zero and positive values indicate the program exited.  The 
  186     highest exit status of the pipeline is returned when a pipeline is 
  189     For MPI programs, the exit status is generally unreliable due to 
  190     implementation-dependent issues, but this package attempts to 
  191     return the highest exit status seen.  Generally, you can count on 
  192     MPI implementations to return zero if you call MPI_Finalize() and 
  193     exit normally, and non-zero if you call MPI_Abort with a non-zero 
  194     argument.  Any other situation will produce unpredictable results.""" 
  202         """!ExitStatusException constructor 
  203         @param message a description of what went wrong 
  204         @param status the exit status""" 
  210         """!An alias for self.returncode: the exit status.""" 
  214         """!A string description of the error.""" 
  217         """!A pythonic description of the error for debugging.""" 
  221     """!Attempts to generate an unmodifiable "copy on write" version 
  222     of the argument.  The returned copy will generate a modifiable 
  223     duplicate of itself if you attempt to change it. 
  224     @returns a produtil.prog.ImmutableRunner 
  225     @param arg a produtil.prog.Runner or produtil.prog.ImmutableRunner""" 
  229         arg.make_runners_immutable()
 
  232         raise InvalidRunArgument(
'Arguments to alias() must be Runner objects (such as from exe()) or MPIRanksBase objects (such as from mpi() or mpiserial()).  Got: %s'%(repr(arg),))
 
  235     """!Returns a prog.ImmutableRunner object that represents a small 
  236     serial program that can be safely run on a busy batch node. 
  237     @param name the executable name or path 
  238     @param kwargs passed to produtil.prog.Runner.__init__ 
  239     @returns a new produtil.prog.ImmutableRunner""" 
  243     """!Returns a prog.ImmutableRunner object that represents a large 
  244     serial program that must be run on a compute node. 
  245     @note This function does NOT search $PATH on Cray.  That ensures 
  246       the $PATH will be expanded on the compute node instead.  Use 
  247       produtil.fileop.find_exe() if you want to explicitly search the 
  248       PATH before execution. 
  249     @param name the executable name or path 
  250     @param kwargs passed to produtil.prog.Runner.__init__ 
  251     @returns a new produtil.prog.ImmutableRunner""" 
  252     return mpiimpl.make_bigexe(str(name),**kwargs)
 
  255     """!Alias for exe() for backward compatibility.  Use exe() instead.""" 
  256     return exe(name,**kwargs)
 
  259     """!Converts an MPI program specification into a runnable shell 
  260     program suitable for run(), runstr() or checkrun(). 
  263     * allranks=True --- to run on all available MPI ranks.  This cannot be 
  264       used if a specific number of ranks (other than 1) was requested in 
  266     * logger=L --- a logging.Logger for log messages 
  267     * Other platform-specific arguments.  See produtil.mpi_impl for details. 
  269     @param arg the mpiprog.MPIRanksBase describing the MPI program to 
  270     run.  This is the output of the mpi() or mpiserial() function. 
  271     @param kwargs additional arguments to control output.   
  272     @returns a prog.Runner object for the specified 
  273     mpiprog.MPIRanksBase object.""" 
  274     return mpiimpl.mpirunner(arg,**kwargs)
 
  277     """!This internal implementation function generates a 
  278     prog.PopenCommand object for the specified input, which may be a 
  279     prog.Runner or mpiprog.MPIRanksBase. 
  280     @param arg the produtil.prog.Runner to convert.  This is the 
  281       output of exe(), bigexe() or mpirun() 
  282     @param capture if True, capture the stdout into a string 
  283     @param kwargs additional keyword arguments, same as for  mpirun()""" 
  287         runner=mpiimpl.mpirunner(arg,**kwargs)
 
  290             'Can only run a Runner object (such as from exe()) or an ' 
  291             'MPIRanksBase object (such as from mpi() or mpiserial()).  ' 
  292             'Got: %s'%(repr(arg),))
 
  294     if 'logger' in kwargs: logger=kwargs[
'logger']
 
  295     if logger 
is not None:
 
  296         logger.info(
'Starting: %s'%(repr(arg),))
 
  297         if capture: logger.info(
'  - and will capture output.')
 
  299     if logger 
is not None:
 
  300         logger.debug(
'Pipeline is %s'%(repr(pl),))
 
  303 def runbg(arg,capture=False,**kwargs):
 
  304     """!Not implemented: background execution 
  306     Runs the specified process in the background.  Specify 
  307     capture=True to capture the command's output.  Returns a 
  308     produtil.prog.PopenCommand.  Call poll() to determine process 
  309     completion, and use the stdout_data property to get the output 
  310     after completion, if capture=True was specified. 
  312     @bug produtil.run.runbg() is not implemented 
  314     @warning this is not implemented 
  316     @param arg the produtil.prog.Runner to execute (output of 
  317       exe(), bigexe() or mpirun() 
  318     @param capture if True, capture output 
  319     @param kwargs same as for mpirun()""" 
  324 def waitprocs(procs,logger=None,timeout=None,usleep=1000):
 
  325     """!Not implemented: background process monitoring 
  327     Waits for one or more backgrounded processes to complete.  Logs to 
  328     the specified logger while doing so.  If a timeout is specified, 
  329     returns False after the given time if some processes have not 
  330     returned.  The usleep argument is the number of microseconds to 
  331     sleep between checks (can be a fraction).  The first argument, 
  332     procs specifies the processes to check.  It must be a 
  333     produtil.prog.Pipeline (return value from runbg) or an iterable 
  334     (list or tuple) of such. 
  336     @bug produtil.run.waitprocs() is untested 
  338     @warning This is not tested and probably does not work. 
  340     @param procs the processes to watch 
  341     @param logger the logging.Logger for log messages 
  342     @param timeout how long to wait before giving up 
  343     @param usleep sleep time between checks""" 
  345     if isinstance(procs,produtil.prog.PopenCommand):
 
  350     if logger 
is not None: logger.info(
"Wait for: %s",repr(p))
 
  356                 if logger 
is not None:
 
  357                     logger.info(
"%s returned %s"%(repr(proc),repr(ret)))
 
  358             elif logger 
is not None and usleep>4.99999e6:
 
  360                 logger.info(
"%s is still running"%(repr(proc),))
 
  366         if usleep>4.99999e6 
and logger 
is not None:
 
  368             logger.info(
"... sleep %f ..."%(float(usleep/1.e6),))
 
  369         time.sleep(usleep/1.e6)
 
  370     return False if(p) 
else True 
  373     """!Runs the "sync" command as an exe().""" 
  374     return mpiimpl.runsync(logger=logger)
 
  376 def run(arg,logger=None,sleeptime=None,**kwargs):
 
  377     """!Executes the specified program and attempts to return its exit 
  378     status.  In the case of a pipeline, the highest exit status seen 
  379     is returned.  For MPI programs, exit statuses are unreliable and 
  380     generally implementation-dependent, but it is usually safe to 
  381     assume that a program that runs MPI_Finalize() and exits normally 
  382     will return 0, and anything that runs MPI_Abort(MPI_COMM_WORLD) 
  383     will return non-zero.  Programs that exit due to a signal will 
  384     return statuses >255 and can be interpreted with WTERMSIG, 
  386     @param arg the produtil.prog.Runner to execute (output of 
  387       exe(), bigexe() or mpirun() 
  388     @param logger a logging.Logger to log messages 
  389     @param sleeptime time to sleep between checks of child process 
  390     @param kwargs ignored""" 
  392     p.communicate(sleeptime=sleeptime)
 
  394     if logger 
is not None:
 
  395         logger.info(
'  - exit status %d'%(int(result),))
 
  399     """!This is a simple wrapper round run that raises 
  400     ExitStatusException if the program exit status is non-zero.   
  402     @param arg the produtil.prog.Runner to execute (output of 
  403       exe(), bigexe() or mpirun() 
  404     @param logger a logging.Logger to log messages 
  405     @param kwargs The optional run=[] argument can provide a different 
  406     list of acceptable exit statuses.""" 
  407     r=
run(arg,logger=logger)
 
  408     if kwargs 
is not None and 'ret' in kwargs:
 
  409         if not r 
in kwargs[
'ret']:
 
  416     """!Sets the number of OpenMP threads for the specified program. 
  418     @warning Generally, when using MPI with OpenMP, the batch system 
  419     must be configured correctly to handle this or unexpected errors 
  422     @param arg The "arg" argument must be from mpiserial, mpi, exe or 
  425     @param threads The optional "threads" argument is an integer number of 
  426     threads.  If it is not specified, the maximum possible number of 
  427     threads will be used.  Note that using threads=None with 
  428     mpirun(...,allranks=True) will generally not work unless the batch 
  429     system has already configured the environment correctly for an 
  430     MPI+OpenMP task with default maximum threads and ranks. 
  431     @returns see run()""" 
  432     return mpiimpl.openmp(arg,threads)
 
  435     """!Executes the specified program or pipeline, capturing its 
  436     stdout and returning that as a string.   
  438     If the exit status is non-zero, then NonZeroExit is thrown.   
  442       runstr(exe('false'),ret=(1)) 
  445     succeeds if "false" returns 1, and raises ExitStatusError otherwise. 
  447     @param arg The "arg" argument must be from mpiserial, mpi, exe or 
  449     @param logger a logging.Logger for logging messages 
  450     @param kwargs You can specify an optional list or tuple "ret" that 
  451     contains an alternative list of valid return codes.  All return 
  452     codes are zero or positive: negative values represent 
  453     signal-terminated programs (ie.: SIGTERM produces -15, SIGKILL 
  454     produces -9, etc.) """ 
  458     if kwargs 
is not None and 'ret' in kwargs:
 
  459         if not r 
in kwargs[
'ret']:
 
  466     """!Returns an MPIRank object that represents the specified MPI 
  468     @param arg the MPI program to run 
  469     @param kwargs logger=L for a logging.Logger to log messages""" 
  473     """!Generates an mpiprog.MPISerial object that represents an MPI 
  474     rank that executes a serial (non-MPI) program.  The given value 
  475     MUST be from bigexe() or exe(), NOT from mpi(). 
  476     @param arg the MPI program to run 
  477     @param kwargs logger=L for a logging.Logger to log messages""" 
def waitprocs
Not implemented: background process monitoring. 
 
def mpirun(arg, kwargs)
Converts an MPI program specification into a runnable shell program suitable for run(), runstr() or checkrun(). 
 
Sets up signal handlers to ensure a clean exit. 
 
This class is a wrapper around launch and manage. 
 
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
 
def run(arg, logger=None, sleeptime=None, kwargs)
Executes the specified program and attempts to return its exit status. 
 
def __init__(self, message, status)
ExitStatusException constructor. 
 
def openmp
Sets the number of OpenMP threads for the specified program. 
 
Implements the produtil.run: provides the object tree for representing shell commands. 
 
def __str__(self)
A string description of the error. 
 
Represents a single MPI rank. 
 
def __repr__(self)
A pythonic description of the error for debugging. 
 
Converts a group of MPI ranks to a runnable command. 
 
Base class of exceptions raised when a Runner is given arguments that make no sense. 
 
def alias(arg)
Attempts to generate an unmodifiable "copy on write" version of the argument. 
 
returncode
The return code, including signal information. 
 
Raised to indicate that an invalid argument was sent into one of the run module functions. 
 
def mpiserial(arg, kwargs)
Generates an mpiprog.MPISerial object that represents an MPI rank that executes a serial (non-MPI) pr...
 
Raised to indicate that a program generated an invalid return code. 
 
Represents a single rank of an MPI program that is actually running a serial program. 
 
Object structure for describing MPI programs. 
 
def runbg(arg, capture=False, kwargs)
Not implemented: background execution. 
 
def runstr(arg, logger=None, kwargs)
Executes the specified program or pipeline, capturing its stdout and returning that as a string...
 
def make_pipeline(arg, capture, kwargs)
This internal implementation function generates a prog.PopenCommand object for the specified input...
 
This is the abstract superclass of all classes that represent one or more MPI ranks, including MPI ranks that are actually serial programs. 
 
def status(self)
An alias for self.returncode: the exit status. 
 
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
 
Represents a single stage of a pipeline to execute. 
 
def runsync
Runs the "sync" command as an exe(). 
 
message
A string description for what went wrong. 
 
Internal module that launches and monitors processes. 
 
def mpi(arg, kwargs)
Returns an MPIRank object that represents the specified MPI executable. 
 
An copy-on-write version of Runner. 
 
def bigexe(name, kwargs)
Alias for exe() for backward compatibility. 
 
def batchexe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a small serial program that can be safely run o...