HWRF  trunk@4391
run.py
1 """!A shell-like syntax for running serial, MPI and OpenMP programs.
2 
3 This module implements a shell-like syntax for launching MPI and
4 non-MPI programs from Python. It recognizes three types of
5 executables: mpi, "small serial" (safe for running on a batch node)
6 and "big serial" (which should be run via aprun if applicable). There
7 is no difference between "small serial" and "big serial" programs
8 except on certain architectures (like Cray) where the job script runs
9 on a heavily-loaded batch node and has compute nodes assigned for
10 running other programs.
11 
12 @section progtype Program Types
13 
14 There are three types of programs: mpi, serial and "big non-MPI." A
15 "big" executable is one that is either OpenMP, or is a serial program
16 that cannot safely be run on heavily loaded batch nodes. On Cray
17 architecture machines, the job script runs on a heavily-populated
18 "batch" node, with some compute nodes assigned for "large" programs.
19 In such environments, the "big" executables are run on compute nodes
20 and the small ones on the batch node.
21 
22 * mpi('exename') = an executable "exename" that calls MPI_Init and
23  MPI_Finalize exactly once each, in that order.
24 * exe('exename') = a small non-MPI program safe to run on a batch node
25 * bigexe('exename') = a big non-MPI program that must be run on a
26  compute node it may or may not use other forms of parallelism
27 
28 You can also make reusable aliases to avoid having to call those
29 functions over and over (more on that later). Examples:
30 
31 * Python: wrf=mpi('./wrf.exe')
32 * Python: lsl=alias(exe('/bin/ls')['-l'].env(LANG='C',LS_COLORS='never'))
33 
34 Those can then be reused later on as if the code is pasted in, similar
35 to a shell alias.
36 
37 @section serexs Serial Execution Syntax
38 
39 Select your serial programs by exe('name') for small serial programs
40 and bigexe('name') for big serial programs. The return value of those
41 functions can then be used with a shell-like syntax to specify
42 redirection and piping. Example:
43 
44 * shell version: ls -l / | wc -l
45 * Python version: run(exe('ls')['-l','/'] | exe('wc')['-l'])
46 
47 Redirection syntax similar to the shell (< > and << operators):
48 @code
49  run( ( exe('myprogram')['arg1','arg2','...'] < 'infile' ) > 'outfile')
50 @endcode
51 
52 Note the extra set of parentheses: you cannot do "exe('prog') < infile
53 > outfile" because of the order of precedence of Python operators
54 
55 Append also works:
56 @code
57  run(exe('myprogram')['arg1','arg2','...'] >> 'appendfile')
58 @endcode
59 
60 You can also send strings as input with <<
61 @code
62  run(exe('myprogram')['arg1','arg2','...'] << 'some input string')
63 @endcode
64 
65 One difference from shells is that < and << always modify the
66 beginning of the pipeline:
67 
68 * shell: cat < infile | wc -l
69 * Python #1: ( exe('cat') < 'infile' ) | exe('wc')['-l']
70 * Python #2: exe('cat') | ( exe('wc')['-l'] < 'infile' )
71 
72 Note that the last second one, equivalent to `cat|wc -l<infile`, would
73 NOT work in a shell since you would be giving wc -l two inputs.
74 
75 @section parexs Parallel Execution Syntax
76 
77 Use mpi('exename') to select your executable, use [] to set arguments,
78 use multiplication to set the number of ranks and use addition to
79 combine different executables together into a multiple program
80 multiple data (MPMD) MPI program.
81 
82 Run ten copies of ls -l:
83 @code
84  run(mpirun(mpiserial(('ls')['-l'])*10))
85 @endcode
86 
87 Run HyCOM coupled HWRF: one wm3c.exe, 30 hycom.exe and 204 wrf.exe:
88 @code
89  run(mpirun(mpi('wm3c.exe') + mpi('hycom.exe')*30 + mpi('wrf.exe')*204))
90 @endcode
91 
92 You can set environment variables, pipe MPI output and handle
93 redirection using the mpirun() function, which converts MPI programs
94 into an bigexe()-style object (Runner):
95 
96 Shell version:
97 @code{.unformatted}
98  result=$( mpirun -n 30 hostname | sort -u | wc -l )
99 @endcode
100 
101 Python version:
102 @code
103  result=runstr( mpirun(mpi('hostname')*30) | exe['sort']['-u'] | exe['wc']['-l'] )
104 @endcode
105 
106 @section aliases Aliases
107 
108 If you find yourself frequently needing the same command, or you need
109 to store a command for multiple uses, then then you should define an
110 alias. Let's say you want "long output" format Japanese language "ls"
111 output:
112 
113 @code
114  exe('ls')['-l','/path/to/dir'].env(LANG='JP')
115 @endcode
116 
117 but you find yourself running that on many different directories.
118 Then you may want to make an alias:
119 
120 @code
121  jplsl=alias(exe('ls')['-l'].env(LANG='JP'))
122 @endcode
123 
124 The return value jplsl can be treated as an exe()-like return value
125 since it was from exe() originally, but any new arguments will be
126 appended to the original set:
127 
128 @code
129  run(jplsl['/path/to/dir'])
130 @endcode
131 
132 Note that if we did this:
133 @code
134  badlsl=exe('ls')['-l'].env(LANG='JP') # Bad! No alias!
135  run(badlsl['/']) # will list /
136  run(badlsl['/home']) # will list / and /home
137  run(badlsl['/usr/bin']) # will list / /home and /usr/bin
138 
139  goodlsl=alias(exe('ls')['-l'].env(LANG='JP')
140  run(goodlsl['/']) # will list /
141  run(goodlsl['/home']) # will list /home
142  run(goodlsl['/usr/bin']) # will list /usr/bin
143 @endcode
144 
145 Then the run(badlsl['/home']) would list /home AND / which is NOT what
146 we want. Why does it do that? It is because badlsl is not an alias
147 --- it is a regular output from exe(), so every time we call its []
148 operator, we add an argument to the original command. When we call
149 alias() it returns a copy-on-write version (goodlsl), where every call
150 to [] creates a new object.
151 
152 Note that alias() also works with pipelines, but most operations will
153 only modify the last the command in the pipeline (or the first, for
154 operations that change stdin).
155 """
156 
157 import time, logging
158 import produtil.mpi_impl as mpiimpl
159 import produtil.sigsafety
160 import produtil.prog as prog
161 import produtil.mpiprog as mpiprog
162 import produtil.pipeline as pipeline
163 
164 ##@var __all__
165 # List of symbols exported by "from produtil.run import *"
166 __all__=['alias','exe','run','runstr','mpi','mpiserial','mpirun',
167  'runbg','prog','mpiprog','mpiimpl','waitprocs','runsync',
168  'InvalidRunArgument','ExitStatusException','checkrun',
169  'batchexe','bigexe']
170 
171 ##@var module_logger
172 # Default logger used by some functions if no logger is given
173 module_logger=logging.getLogger('produtil.run')
174 
176  """!Raised to indicate that an invalid argument was sent into one
177  of the run module functions."""
178 
179 class ExitStatusException(Exception):
180  """!Raised to indicate that a program generated an invalid return
181  code.
182 
183  Examine the "returncode" member variable for the returncode value.
184  Negative values indicate the program was terminated by a signal
185  while zero and positive values indicate the program exited. The
186  highest exit status of the pipeline is returned when a pipeline is
187  used.
188 
189  For MPI programs, the exit status is generally unreliable due to
190  implementation-dependent issues, but this package attempts to
191  return the highest exit status seen. Generally, you can count on
192  MPI implementations to return zero if you call MPI_Finalize() and
193  exit normally, and non-zero if you call MPI_Abort with a non-zero
194  argument. Any other situation will produce unpredictable results."""
195  ##@var message
196  # A string description for what went wrong
197 
198  ##@var returncode
199  # The return code, including signal information.
200 
201  def __init__(self,message,status):
202  """!ExitStatusException constructor
203  @param message a description of what went wrong
204  @param status the exit status"""
205  self.message=message
206  self.returncode=status
207 
208  @property
209  def status(self):
210  """!An alias for self.returncode: the exit status."""
211  return self.returncode
212 
213  def __str__(self):
214  """!A string description of the error."""
215  return '%s (returncode=%d)'%(str(self.message),int(self.returncode))
216  def __repr__(self):
217  """!A pythonic description of the error for debugging."""
218  return 'NonZeroExit(%s,%s)'%(repr(self.message),repr(self.returncode))
219 
220 def alias(arg):
221  """!Attempts to generate an unmodifiable "copy on write" version
222  of the argument. The returned copy will generate a modifiable
223  duplicate of itself if you attempt to change it.
224  @returns a produtil.prog.ImmutableRunner
225  @param arg a produtil.prog.Runner or produtil.prog.ImmutableRunner"""
226  if isinstance(arg,prog.Runner):
227  return prog.ImmutableRunner(arg)
228  elif isinstance(arg,mpiprog.MPIRanksBase):
229  arg.make_runners_immutable()
230  return arg
231  else:
232  raise InvalidRunArgument('Arguments to alias() must be Runner objects (such as from exe()) or MPIRanksBase objects (such as from mpi() or mpiserial()). Got: %s'%(repr(arg),))
233 
234 def batchexe(name,**kwargs):
235  """!Returns a prog.ImmutableRunner object that represents a small
236  serial program that can be safely run on a busy batch node.
237  @param name the executable name or path
238  @param kwargs passed to produtil.prog.Runner.__init__
239  @returns a new produtil.prog.ImmutableRunner"""
240  return prog.ImmutableRunner([str(name)],**kwargs)
241 
242 def exe(name,**kwargs):
243  """!Returns a prog.ImmutableRunner object that represents a large
244  serial program that must be run on a compute node.
245  @note This function does NOT search $PATH on Cray. That ensures
246  the $PATH will be expanded on the compute node instead. Use
247  produtil.fileop.find_exe() if you want to explicitly search the
248  PATH before execution.
249  @param name the executable name or path
250  @param kwargs passed to produtil.prog.Runner.__init__
251  @returns a new produtil.prog.ImmutableRunner"""
252  return mpiimpl.make_bigexe(str(name),**kwargs)
253 
254 def bigexe(name,**kwargs):
255  """!Alias for exe() for backward compatibility. Use exe() instead."""
256  return exe(name,**kwargs)
257 
258 def mpirun(arg,**kwargs):
259  """!Converts an MPI program specification into a runnable shell
260  program suitable for run(), runstr() or checkrun().
261 
262  Options for kwargs:
263  * allranks=True --- to run on all available MPI ranks. This cannot be
264  used if a specific number of ranks (other than 1) was requested in
265  the arg.
266  * logger=L --- a logging.Logger for log messages
267  * Other platform-specific arguments. See produtil.mpi_impl for details.
268 
269  @param arg the mpiprog.MPIRanksBase describing the MPI program to
270  run. This is the output of the mpi() or mpiserial() function.
271  @param kwargs additional arguments to control output.
272  @returns a prog.Runner object for the specified
273  mpiprog.MPIRanksBase object."""
274  return mpiimpl.mpirunner(arg,**kwargs)
275 
276 def make_pipeline(arg,capture,**kwargs):
277  """!This internal implementation function generates a
278  prog.PopenCommand object for the specified input, which may be a
279  prog.Runner or mpiprog.MPIRanksBase.
280  @param arg the produtil.prog.Runner to convert. This is the
281  output of exe(), bigexe() or mpirun()
282  @param capture if True, capture the stdout into a string
283  @param kwargs additional keyword arguments, same as for mpirun()"""
284  if isinstance(arg,prog.Runner):
285  runner=arg
286  elif isinstance(arg, mpiprog.MPIRanksBase):
287  runner=mpiimpl.mpirunner(arg,**kwargs)
288  else:
289  raise InvalidRunArgument(
290  'Can only run a Runner object (such as from exe()) or an '
291  'MPIRanksBase object (such as from mpi() or mpiserial()). '
292  'Got: %s'%(repr(arg),))
293  logger=None
294  if 'logger' in kwargs: logger=kwargs['logger']
295  if logger is not None:
296  logger.info('Starting: %s'%(repr(arg),))
297  if capture: logger.info(' - and will capture output.')
298  pl=pipeline.Pipeline(runner,capture=capture,logger=logger)
299  if logger is not None:
300  logger.debug('Pipeline is %s'%(repr(pl),))
301  return pl
302 
303 def runbg(arg,capture=False,**kwargs):
304  """!Not implemented: background execution
305 
306  Runs the specified process in the background. Specify
307  capture=True to capture the command's output. Returns a
308  produtil.prog.PopenCommand. Call poll() to determine process
309  completion, and use the stdout_data property to get the output
310  after completion, if capture=True was specified.
311 
312  @bug produtil.run.runbg() is not implemented
313 
314  @warning this is not implemented
315 
316  @param arg the produtil.prog.Runner to execute (output of
317  exe(), bigexe() or mpirun()
318  @param capture if True, capture output
319  @param kwargs same as for mpirun()"""
320  p=make_pipeline(arg,capture,**kwargs)
321  p.background()
322  return p
323 
324 def waitprocs(procs,logger=None,timeout=None,usleep=1000):
325  """!Not implemented: background process monitoring
326 
327  Waits for one or more backgrounded processes to complete. Logs to
328  the specified logger while doing so. If a timeout is specified,
329  returns False after the given time if some processes have not
330  returned. The usleep argument is the number of microseconds to
331  sleep between checks (can be a fraction). The first argument,
332  procs specifies the processes to check. It must be a
333  produtil.prog.Pipeline (return value from runbg) or an iterable
334  (list or tuple) of such.
335 
336  @bug produtil.run.waitprocs() is untested
337 
338  @warning This is not tested and probably does not work.
339 
340  @param procs the processes to watch
341  @param logger the logging.Logger for log messages
342  @param timeout how long to wait before giving up
343  @param usleep sleep time between checks"""
344  p=set()
345  if isinstance(procs,produtil.prog.PopenCommand):
346  p.add(procs)
347  else:
348  for pp in procs:
349  p.add(pp)
350  if logger is not None: logger.info("Wait for: %s",repr(p))
351  while p: # keep looping as long as there are unfinished processes
352  p2=set()
353  for proc in p:
354  ret=proc.poll()
355  if ret is not None:
356  if logger is not None:
357  logger.info("%s returned %s"%(repr(proc),repr(ret)))
358  elif logger is not None and usleep>4.99999e6:
359  # babble about running processes if the sleep time is long.
360  logger.info("%s is still running"%(repr(proc),))
361  p2.add(proc)
362  p=p2
363 
364  if not p: break # done! no need to sleep...
365 
366  if usleep>4.99999e6 and logger is not None:
367  # babble about sleeping if the sleep time is 5sec or longer:
368  logger.info("... sleep %f ..."%(float(usleep/1.e6),))
369  time.sleep(usleep/1.e6)
370  return False if(p) else True
371 
372 def runsync(logger=None):
373  """!Runs the "sync" command as an exe()."""
374  return mpiimpl.runsync(logger=logger)
375 
376 def run(arg,logger=None,sleeptime=None,**kwargs):
377  """!Executes the specified program and attempts to return its exit
378  status. In the case of a pipeline, the highest exit status seen
379  is returned. For MPI programs, exit statuses are unreliable and
380  generally implementation-dependent, but it is usually safe to
381  assume that a program that runs MPI_Finalize() and exits normally
382  will return 0, and anything that runs MPI_Abort(MPI_COMM_WORLD)
383  will return non-zero. Programs that exit due to a signal will
384  return statuses >255 and can be interpreted with WTERMSIG,
385  WIFSIGNALLED, etc.
386  @param arg the produtil.prog.Runner to execute (output of
387  exe(), bigexe() or mpirun()
388  @param logger a logging.Logger to log messages
389  @param sleeptime time to sleep between checks of child process
390  @param kwargs ignored"""
391  p=make_pipeline(arg,False,logger=logger)
392  p.communicate(sleeptime=sleeptime)
393  result=p.poll()
394  if logger is not None:
395  logger.info(' - exit status %d'%(int(result),))
396  return result
397 
398 def checkrun(arg,logger=None,**kwargs):
399  """!This is a simple wrapper round run that raises
400  ExitStatusException if the program exit status is non-zero.
401 
402  @param arg the produtil.prog.Runner to execute (output of
403  exe(), bigexe() or mpirun()
404  @param logger a logging.Logger to log messages
405  @param kwargs The optional run=[] argument can provide a different
406  list of acceptable exit statuses."""
407  r=run(arg,logger=logger)
408  if kwargs is not None and 'ret' in kwargs:
409  if not r in kwargs['ret']:
410  raise ExitStatusException('%s: unexpected exit status'%(repr(arg),),r)
411  elif not r==0:
412  raise ExitStatusException('%s: non-zero exit status'%(repr(arg),),r)
413  return r
414 
415 def openmp(arg,threads=None):
416  """!Sets the number of OpenMP threads for the specified program.
417 
418  @warning Generally, when using MPI with OpenMP, the batch system
419  must be configured correctly to handle this or unexpected errors
420  will result.
421 
422  @param arg The "arg" argument must be from mpiserial, mpi, exe or
423  bigexe.
424 
425  @param threads The optional "threads" argument is an integer number of
426  threads. If it is not specified, the maximum possible number of
427  threads will be used. Note that using threads=None with
428  mpirun(...,allranks=True) will generally not work unless the batch
429  system has already configured the environment correctly for an
430  MPI+OpenMP task with default maximum threads and ranks.
431  @returns see run()"""
432  return mpiimpl.openmp(arg,threads)
433 
434 def runstr(arg,logger=None,**kwargs):
435  """!Executes the specified program or pipeline, capturing its
436  stdout and returning that as a string.
437 
438  If the exit status is non-zero, then NonZeroExit is thrown.
439 
440  Example:
441  @code
442  runstr(exe('false'),ret=(1))
443  @endcode
444 
445  succeeds if "false" returns 1, and raises ExitStatusError otherwise.
446 
447  @param arg The "arg" argument must be from mpiserial, mpi, exe or
448  bigexe.
449  @param logger a logging.Logger for logging messages
450  @param kwargs You can specify an optional list or tuple "ret" that
451  contains an alternative list of valid return codes. All return
452  codes are zero or positive: negative values represent
453  signal-terminated programs (ie.: SIGTERM produces -15, SIGKILL
454  produces -9, etc.) """
455  p=make_pipeline(arg,True,logger=logger)
456  s=p.to_string()
457  r=p.poll()
458  if kwargs is not None and 'ret' in kwargs:
459  if not r in kwargs['ret']:
460  raise ExitStatusException('%s: unexpected exit status'%(repr(arg),),r)
461  elif not r==0:
462  raise ExitStatusException('%s: non-zero exit status'%(repr(arg),),r)
463  return s
464 
465 def mpi(arg,**kwargs):
466  """!Returns an MPIRank object that represents the specified MPI
467  executable.
468  @param arg the MPI program to run
469  @param kwargs logger=L for a logging.Logger to log messages"""
470  return mpiprog.MPIRank(arg,**kwargs)
471 
472 def mpiserial(arg,**kwargs):
473  """!Generates an mpiprog.MPISerial object that represents an MPI
474  rank that executes a serial (non-MPI) program. The given value
475  MUST be from bigexe() or exe(), NOT from mpi().
476  @param arg the MPI program to run
477  @param kwargs logger=L for a logging.Logger to log messages"""
478  return mpiprog.MPISerial(arg,**kwargs)
def waitprocs
Not implemented: background process monitoring.
Definition: run.py:324
def mpirun(arg, kwargs)
Converts an MPI program specification into a runnable shell program suitable for run(), runstr() or checkrun().
Definition: run.py:258
Sets up signal handlers to ensure a clean exit.
Definition: sigsafety.py:1
This class is a wrapper around launch and manage.
Definition: pipeline.py:564
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
Definition: run.py:398
def run(arg, logger=None, sleeptime=None, kwargs)
Executes the specified program and attempts to return its exit status.
Definition: run.py:376
def __init__(self, message, status)
ExitStatusException constructor.
Definition: run.py:201
def openmp
Sets the number of OpenMP threads for the specified program.
Definition: run.py:415
Implements the produtil.run: provides the object tree for representing shell commands.
Definition: prog.py:1
def __str__(self)
A string description of the error.
Definition: run.py:213
Represents a single MPI rank.
Definition: mpiprog.py:469
def __repr__(self)
A pythonic description of the error for debugging.
Definition: run.py:216
Converts a group of MPI ranks to a runnable command.
Definition: __init__.py:1
Base class of exceptions raised when a Runner is given arguments that make no sense.
Definition: prog.py:46
def alias(arg)
Attempts to generate an unmodifiable "copy on write" version of the argument.
Definition: run.py:220
returncode
The return code, including signal information.
Definition: run.py:206
Raised to indicate that an invalid argument was sent into one of the run module functions.
Definition: run.py:175
def mpiserial(arg, kwargs)
Generates an mpiprog.MPISerial object that represents an MPI rank that executes a serial (non-MPI) pr...
Definition: run.py:472
Raised to indicate that a program generated an invalid return code.
Definition: run.py:179
Represents a single rank of an MPI program that is actually running a serial program.
Definition: mpiprog.py:604
Object structure for describing MPI programs.
Definition: mpiprog.py:1
def runbg(arg, capture=False, kwargs)
Not implemented: background execution.
Definition: run.py:303
def runstr(arg, logger=None, kwargs)
Executes the specified program or pipeline, capturing its stdout and returning that as a string...
Definition: run.py:434
def make_pipeline(arg, capture, kwargs)
This internal implementation function generates a prog.PopenCommand object for the specified input...
Definition: run.py:276
This is the abstract superclass of all classes that represent one or more MPI ranks, including MPI ranks that are actually serial programs.
Definition: mpiprog.py:68
def status(self)
An alias for self.returncode: the exit status.
Definition: run.py:209
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
Definition: run.py:242
Represents a single stage of a pipeline to execute.
Definition: prog.py:299
def runsync
Runs the "sync" command as an exe().
Definition: run.py:372
message
A string description for what went wrong.
Definition: run.py:205
Internal module that launches and monitors processes.
Definition: pipeline.py:1
def mpi(arg, kwargs)
Returns an MPIRank object that represents the specified MPI executable.
Definition: run.py:465
An copy-on-write version of Runner.
Definition: prog.py:884
def bigexe(name, kwargs)
Alias for exe() for backward compatibility.
Definition: run.py:254
def batchexe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a small serial program that can be safely run o...
Definition: run.py:234