HWRF  trunk@4391
prog.py
1 """!Implements the produtil.run: provides the object tree for
2 representing shell commands.
3 
4 Do not load this module directly except for type checking
5 (instanceof(o,produtil.prog.Runner)). It is meant to be used only by
6 the produtil.run module. This module is part of the implementation of
7 a shell-like syntax for running programs. The rest of the
8 implementation is in the produtil.run and produtil.pipeline modules.
9 MPI programs are implemented by the produtil.mpiprog and
10 produtil.mpi_impl.
11 
12 This module implements a shell-like syntax of running shell programs
13 from Python. This module should not be used directly: the
14 produtil.run implements critical parts of the functionality.
15 Specifically, this module implements the Runner and ImmutableRunner
16 classes. It also knows how to convert them to
17 produtil.pipeline.Pipeline objects for actual execution.
18 
19 * Runner --- This class represents a process that could be run. It keeps
20  track of all possible aspects of running a process, including the
21  command, arguments, environment variables, stdout stream, stderr
22  stream, stdin stream, and a list of functions or callable objects to
23  run before executing the problem. Provides public functions to modify
24  the Runner.
25 
26 * ImmutableRunner --- A Runner that cannot be changed: when modifying the
27  Runner, it returns a new object. This is to implement shell aliases.
28  For example, one could make an ImmutableRunner for program to index
29  GRIB2 files. All the user would have to do is add the GRIB2 file as
30  an argument, and capture the output.
31 
32 Note that the actual work of creating the Runner or ImmutableRunner,
33 or turning them into Pipeline objects done by the produtil.run module.
34 Turning MPI programs into Runner objects is done by the
35 produtil.mpiprog module and produtil.mpi_impl package, with the public
36 interface in produtil.run. Hence, nobody would ever load this module
37 directly, except for type checking (ie.: to see if your argument is a
38 Runner before passing it to produtil.run.checkrun).."""
39 
40 import produtil.sigsafety
41 import StringIO,select,io,re,time,fcntl,os,logging,signal
42 
43 import produtil.mpi_impl
44 from produtil.pipeline import launch, manage, PIPE, ERR2OUT
45 
46 class ProgSyntaxError(Exception):
47  """!Base class of exceptions raised when a Runner is given
48  arguments that make no sense."""
50  """!Raised when one tries to specify the stdout, stderr or stdin to
51  go to, or come from, more than one location"""
53  """!Raised when the caller specifies more than one source for the
54  stdin of a Runner"""
56  """!Raised when the caller specifies more than one destination for
57  a Runner's stdout"""
59  """!Raised when the caller specifies more than one destination for
60  a Runner's stderr"""
62  """!Raised when the caller specifies an invalid input or output
63  when piping a Runner into or out of another object."""
64 
65 class NotValidPosixSh(Exception):
66  """!Base class of exceptions that are raised when converting a
67  Runner or pipeline of Runners to a POSIX sh command, if the Runner
68  cannot be expressed as POSIX sh."""
70  """!Raised when trying to convert a pipeline of Runners to a POSIX
71  sh string, if a redirection in the pipeline cannot be expressed in
72  POSIX sh."""
73 class NotValidPosixShString(Exception):
74  """!Raised when converting a Runner or pipeline of Runners to a
75  POSIX sh string. If a string is sent to a program's stdin, this
76  is raised when that string cannot be expressed in POSIX sh."""
77 class EqualInExecutable(Exception):
78  """!Raised when converting a Runner or pipeline of Runners to a
79  posix sh string if a Runner's executable contains an equal ("=")
80  sign."""
81 class EqualInEnv(Exception):
82  """!Raised when converting a Runner or pipeline of Runners to a
83  POSIX sh string if there is an equal ("=") sign in an environment
84  variable name."""
85 
86 def shvarok(s):
87  """!Returns True if the specified environment variable name is a
88  valid POSIX sh variable name, and False otherwise.
89  @param s an environment variable name"""
90  if re.search(r'\A[A-Za-z][A-Za-z0-9_]*\z',s):
91  return True
92  else:
93  return False
94 
95 def shstrok(s):
96  """!Returns True if the specified string can be expressed as a
97  POSIX sh string, and false otherwise.
98  @param s a string"""
99  # Only allow non-whitespace ASCII and space (chr(32)-chr(126)):
100  if re.search(r'\A[a-zA-Z0-9 !"#$%&?()*+,./:;<=>?@^_`{|}~\\\]\[\'-]*\Z',s):
101  return True
102  else:
103  return False
104 
105 def shbackslash(s):
106  """!Given a Python str, returns a backslashed POSIX sh string, or
107  raises NotValidPosixShString if that cannot be done.
108  @param s a string to backslash"""
109  if not shstrok(s):
110  raise NotValidPosixShString('String is not expressable in POSIX sh: %s'%(repr(s),))
111  if re.search(r'(?ms)[^a-zA-Z0-9_+.,/-]',s):
112  return '"' + re.sub(r'(["\\\\$])',r"\\\1",s) + '"'
113  return s
114 
115 ########################################################################
116 
117 class StreamGenerator(object):
118  """!This is part of the internal implementation of Runner, and is
119  used to convert it to a produtil.pipeline.Pipeline for execution.
120  This is an abstract class whose subclasses create the Popen's
121  stdout, stdin and stderr."""
122  def for_input(self):
123  """!Has no effect. This exists only for debugging."""
124  return '<unexpected:%s>'%(repr(self),)
125  def for_output(self):
126  """!Has no effect. This exists only for debugging."""
127  return '<unexpected:%s>'%(repr(self),)
128  def repr_for_err(self):
129  """!Returns the stderr value. The default implementation
130  returns repr_for_out(), causing stderr to receive whatever
131  stdout receives."""
132  return self.repr_for_out()
133 
135  """!This is part of the internal implementation of Runner, used to
136  convert it to a produtil.pipeline.Pipeline for execution. It
137  represents stdin, stdout or stderr being connected to an open
138  file. It instructs the Runner to open the file before starting
139  the process."""
140  def __init__(self,filename,mode,err=False):
141  """!FileOpener constructor
142  @param filename the name of the file being opened
143  @param mode how it is being opened
144  @param err if True, this is for stderr"""
145  self.filename=filename
146  self.mode=mode
147  self.err=err
148  ##@var filename
149  # the name of the file being opened
150 
151  ##@var mode
152  # how the file is being opened
153 
154  ##@var err
155  # If True, this is for stderr.
156 
157  def copy(self):
158  """!Creates a shallow copy of this object."""
159  return FileOpener(self.filename,self.mode,self.err)
160  def to_shell(self):
161  """!Creates a POSIX sh representation of the part of the
162  command that requests redirection."""
163  more=''
164  if self.err:
165  more='2'
166  if self.mode=='ab': return '%s>> %s'%(more,shbackslash(self.filename))
167  elif self.mode=='wb': return '%s> %s'%(more,shbackslash(self.filename))
168  elif self.mode=='rb': return 'cat %s | '%(shbackslash(self.filename),)
169  raise NoSuchRedirection('Cannot convert file open mode %s to a '
170  'POSIX sh redirection'%(self.mode,))
171  @property
172  def intmode(self):
173  """!Returns an integer version of mode suitable for os.open"""
174  intmode=None
175  assert('r' in 'rb')
176  if 'r' in self.mode:
177  intmode=os.O_RDONLY
178  elif 'w' in self.mode:
179  intmode=os.O_WRONLY|os.O_CREAT|os.O_TRUNC
180  elif 'a' in self.mode:
181  intmode=os.O_WRONLY|os.O_CREAT|os.O_APPEND
182  assert(intmode is not None)
183  return intmode
184  def _gen_stream(self):
185  """!Returns a tuple (None,stream,None,True) where "stream" is
186  the opened file object."""
187  return (None,os.open(self.filename,self.intmode),None,True)
188  def __repr__(self):
189  """!Returns a string representation of this object as valid
190  Python code."""
191  return 'FileOpener(%s,%s)'%(repr(self.filename),repr(self.mode))
192  def repr_for_in(self):
193  """!Part of the implementation of Runner.__repr__, this returns
194  the filename and ",string=False"."""
195  return repr(self.filename)+',string=False'
196  def repr_for_out(self):
197  """!Part of the implementation of Runner.__repr__, this returns
198  the filename and ",string=False". It also appends ",append=X"
199  where X is the true/false flag for appending to the file."""
200  return '%s,append=%s'%(repr(self.filename),repr(self.mode=='ab'))
201  def repr_for_err(self):
202  """!Same as repr_for_out."""
203  return self.repr_for_out()
204 
206  """!Represents sending a string to a process's stdin."""
207  def __init__(self,obj):
208  """!Creates a StringInput that sends the specified object to
209  stdin.
210  @param obj the object to send to stdin"""
211  self.obj=obj
212  ##@var obj
213  # the object to send to stdin
214 
215  def copy(self):
216  """!Returns a shallow copy of this object."""
217  return StringInput(self.obj)
218  def _gen_stream(self):
219  """!Returns a tuple containing (O,None,None,None) where O was
220  the object sent to the StringInput constructor."""
221  return (self.obj,None,None,None)
222  def __repr__(self):
223  """!Returns a string representation of this object as valid
224  Python code."""
225  return 'StringInput(%s)'%(repr(self.obj),)
226  def to_shell(self):
227  """!Converts this object, if possible, to an echo command
228  followed by a pipe ("|")."""
229  return 'echo %s | '%(shbackslash(self.obj))
230  def repr_for_in(self):
231  """!Part of the implementation of Runner.__repr__. If
232  possible, this creates valid Python code to represent
233  specifying sending the given string to the stdin of a Runner.
234  If the string is too long, it is abbreviated."""
235  if len(self.obj)>40:
236  return "%s...,string=True"%(repr(self.obj[0:37]+'...'),)
237  else:
238  return '%s,string=True'%(repr(self.obj),)
239 
241  """!Arranges for a stream-like object to be sent to the stdout,
242  stderr or stdin of a Runner."""
243  def __init__(self,obj):
244  """!Creates a StreamReuser for the specified stream-like object.
245  @param obj the stream-like object to reuse."""
246  self.obj=obj
247  ##@var obj
248  # the stream-like object to reuse.
249  def copy(self):
250  """!Returns a shallow copy of this object. Note that means
251  that the underlying stream object is not copied."""
252  return StreamReuser(self.obj)
253  def to_shell(self):
254  """!Raises NotValidPosixSh to indicate that the stream cannot
255  be represented as POSIX sh."""
256  raise NotValidPosixSh('Python streams cannot be passed to '
257  'remote POSIX sh processes.')
258  def _gen_stream(self):
259  """!Returns a tuple (None,None,obj,False) where obj is the
260  provided stream-like object."""
261  return (None,None,self.obj,False)
262  def repr_for_in(self):
263  """!Returns repr(obj) where obj is the given stream-like
264  object."""
265  return repr(self.obj)
266  def repr_for_out(self):
267  """!Returns repr(obj) where obj is the given stream-like
268  object."""
269  return repr(self.obj)
270 
272  """!Instructs a Runner to send stderr to stdout"""
273  def __init__(self):
274  """!OutIsError constructor."""
275  def copy(self):
276  """!Returns a new OutIsError object."""
277  return OutIsError()
278  def to_shell(self):
279  """!Returns "2>&1" """
280  return '2>&1'
281  def _gen_stream(self):
282  """!Returns a tuple containing (None,None,pipeline.ERR2OUT,False)"""
283  return (None,None,ERR2OUT,False)
284  def repr_for_in(self):
285  """!This should never be called. It returns ".err2out()"."""
286  return '.err2out()'
287  def repr_for_out(self):
288  """!Part of the representation of Runner.__repr__. Returns
289  ".err2out()" which instructs a Runner to send stderr to
290  stdout."""
291  return '.err2out()'
292  def __eq__(self,other):
293  """!Is the other object an OutIsError?
294  @param other the other object to analyze."""
295  return isinstance(other,OutIsError)
296 
297 ########################################################################
298 
299 class Runner(object):
300  """!Represents a single stage of a pipeline to execute.
301 
302  This is a linked list class used to store information about a
303  program or pipeline of programs to be run. It has the capability
304  of converting itself to a Pipeline object (run(Runner)), or
305  converting itself to a POSIX sh command (Runner.to_shell()). Note
306  that some commands cannot be represented in POSIX sh, such as
307  commands with non-ASCII characters or commands that have Python
308  streams as their stdout or stdin. Those commands can still be run
309  with a Pipeline, but trying to convert them to a POSIX sh command
310  will throw NotValidPosixSh or a subclass thereof."""
311  def __init__(self,args,**kwargs):
312  """!Creates a new Runner.
313 
314  The only non-keyword argument can be one of three things:
315 
316  1. A Runner to copy. Every aspect of the Runner that can be
317  copied will be. Note that if a stream-like object is
318  connected to stdin, stdout or stderr, it will NOT be
319  copied.
320 
321  2. A list of strings. This will be used as the command
322  path, and arguments.
323 
324  Many options can be set via keyword arguments:
325 
326  * clearenv=True - the environment should be cleared before
327  running this command. Any arguments set by the env=
328  keyword or the .env(...) member function ignore this.
329  Also, PATH, USER, LOGNAME and HOME are retained since
330  most programs cannot run without them.
331 
332  * env=dict(var=value,...) - a dict of environment variables to
333  set before running the Runner. Does NOT affect this
334  parent's process, only the child process.
335 
336  * in=filename - a file to send to stdin.
337 
338  * instr=str - a string to send to stdin
339 
340  * out=filename - a file to connect to stdout. Will truncate the file.
341 
342  * outa=filename - same as "out=filename," but appends to the file.
343 
344  * err2out - redirects stderr to stdout
345 
346  * err=filename - a file to connect to stderr. Will truncate the file.
347 
348  * erra=filename - same as "err=filename," but appends to the file.
349 
350  * prerun=[obj,anotherobj,...] - sent to self.prerun, this is a
351  list of functions or callable objects to run before
352  executing the process. The objects are not called until
353  execution is requested via self._gen.
354  @param args the arguments to the program
355  @param kwargs other settings (see constructor description)."""
356 
357  self._stdin=self._stdout=self._stderr=self._prev=self._env=None
358  self._prerun=self._cd=None
359  self._threads=None
360 
361  if isinstance(args,Runner):
362  r=args # other runner to copy
363  self._args=list(r._args)
364  if(r._stdin is not None): self._stdin=r._stdin.copy()
365  if(r._stdout is not None): self._stdout=r._stdout.copy()
366  if(r._stderr is not None): self._stderr=r._stderr.copy()
367  if(r._prev is not None): self._prev=r._prev.copy()
368  if(r._env is not None): self._env=dict(r._env)
369  if(r._prerun is not None): self._prerun=list(r._prerun)
370  self._copy_env=r._copy_env
371  else:
372  if not isinstance(args,list):
373  raise TypeError('The args argument must be a list, not a %s %s.'%(
374  type(args).__name__,repr(args)))
375  if not isinstance(args[0],basestring):
376  raise TypeError('The first element of args must be a string, not a %s %s.'%(
377  type(args[0]).__name__,repr(args)))
378  self._args=args
379  self._copy_env=True
380 
381  # Initialize environment if requested:
382  if 'clearenv' in kwargs and kwargs['clearenv']: self.clearenv()
383  if 'env' in kwargs: self._env=dict(kwargs['env'])
384 
385  # Initialize input/output/error if requested:
386  if 'in' in kwargs: self<kwargs['in']
387  if 'instr' in kwargs: self<<str(kwargs['instr'])
388  if 'out' in kwargs: self>kwargs['out']
389  if 'outa' in kwargs: self>>kwargs['outa']
390  if 'err2out' in kwargs: self.err2out()
391  if 'err' in kwargs: self.err(kwargs['err'],append=False)
392  if 'erra' in kwargs: self.err(kwargs['erra'],append=True)
393  if 'cd' in kwargs: self.cd(kwargs['cd'])
394 
395  # Allow a list of "prerun" callables that will be called at
396  # the beginning of self._gen:
397  if 'prerun' in kwargs: self.prerun(kwargs['prerun'])
398 
399  def getthreads(self):
400  """!Returns the number of threads requested by this program."""
401  return self._threads
402  def setthreads(self,nthreads):
403  """!Sets the number of threads requested by this program."""
404  self._threads=int(nthreads)
405  return self._threads
406  def delthreads(self):
407  """!Removes the request for threads."""
408  self._threads=None
409 
410  threads=property(getthreads,setthreads,delthreads,"""The number of threads per rank.""")
411 
412  @property
413  def first(self):
414  """!Returns the first Runner in this pipeline."""
415  if self._prev is None:
416  return self
417  return self._prev.first
418 
419  def prerun(self,arg):
420  """!Adds a function or callable object to be called before
421  running the program.
422 
423  The callables should be very fast operations, and are executed
424  by self._gen when creating the Pipeline. They take, as an
425  argument, the Runner and an optional "logger" keyword argument
426  that is either None, or a logging.Logger to use to log
427  messages.
428  @param arg a callable object that takes self as an argument, and
429  an optional keyword argument "logger" with a logging.Logger for
430  log messages"""
431  if self._prerun is None:
432  self._prerun=[arg]
433  else:
434  self._prerun.append(arg)
435  def _stringify_arg(self,arg):
436  """!Returns a string representation of the given argument to
437  convert it to an input to produtil.pipeline.Pipeline.
438  Conversions:
439  * float --- converted via %g
440  * int --- converted via %d
441  * basestring --- no conversion; used directly
442  * all others --- str(arg)
443  @param arg the argument to convert
444  @returns a string version of arg"""
445  if isinstance(arg,float):
446  return '%g'%(arg,)
447  elif isinstance(arg,int):
448  return '%d'%(arg,)
449  elif isinstance(arg,basestring):
450  return arg
451  else:
452  return str(arg)
453  def __getitem__(self,args):
454  """!Add one or more arguments to the executable.
455 
456  Can ONLY accept strings, ints, floats or iterables (tuple,
457  list). Strings, ints and floats are sent to _stringify_args,
458  and the result is added to the end of the list of arguments to
459  the command to run. For iterables (tuple, list), adds all
460  elements to the list of arguments, passing each through
461  _stringify_args.
462  @param args one or more arguments to add
463  @returns self"""
464  if isinstance(args,basestring) or isinstance(args,float) \
465  or isinstance(args,int):
466  self._args.append(self._stringify_arg(args))
467  else:
468  self._args.extend([self._stringify_arg(x) for x in args])
469  return self
470  def __str__(self):
471  """!Alias for __repr__()"""
472  return self.__repr__()
473  def __repr__(self):
474  """!Attempts to produce valid Python code to represent this
475  Runnable. Generally, that can be done, unless an input string
476  is too long, or a stream is connected to a Python object. In
477  those cases, human-readable representations are given, which
478  are not exactly Python code."""
479  if self._prev is not None:
480  s='%s | '%(repr(self._prev),)
481  else:
482  s=''
483  if len(self._args)==0:
484  s+='exe(<empty>)'
485  else:
486  s+='exe(%s)'%(repr(self._args[0]))
487  if len(self._args)>1:
488  s+='['+','.join([repr(x) for x in self._args[1:]])+']'
489  if self._stdin is not None:
490  s+='.in(%s)'%(self._stdin.repr_for_in(),)
491  if self._stdout is not None:
492  s+='.out(%s)'%(self._stdout.repr_for_out(),)
493  if self._stderr is not None:
494  if isinstance(self._stderr,OutIsError):
495  s+='.err2out()'
496  else:
497  s+='.err(%s)'%(self._stderr.repr_for_err(),)
498  if not self._copy_env:
499  s+='.clearenv()'
500  if self._env is not None:
501  s+='.env('+(', '.join(['%s=%s'%(k,v)
502  for k,v in self._env.iteritems()]))+')'
503  if self._prerun is not None:
504  s+=''.join(['.prerun(%s)'%(repr(x),) for x in self._prerun])
505  if self._cd is not None:
506  s+=".cd("+repr(self._cd)+")"
507  return s
508 
509  def __eq__(self,other):
510  """!Returns True if the other object is a Runner that is equal
511  to this one, and False otherwise.
512  @param other the object to compare"""
513  if isinstance(other,Runner):
514  return self._args==other._args and \
515  self._copy_env==other._copy_env and \
516  self._stdin==other._stdin and \
517  self._stdout==other._stdout and \
518  self._stderr==other._stderr and \
519  self._env==other._env and \
520  self._prerun==other._prerun
521  else:
522  return NotImplemented
523 
524  def isplainexe(self):
525  """!Returns true if this is simply an executable with arguments
526  (no redirection, no prerun objects, no environment
527  modification, no piping), and False otherwise."""
528  return self._stdin is None and self._stdout is None and \
529  self._prerun is None and self._stderr is None and \
530  self._env is None and self._prev is None
531 
532  def cd(self,dirpath):
533  """!Requests that this process run in the specified directory.
534  The directory must already exist before the program starts.
535  @param dirpath the directory to cd into, which must already exist.
536  @returns self"""
537  self._cd=dirpath
538  return self
539  def __lt__(self,stdin):
540  """!Connects the given object to stdin, via inp(stdin,string=False).
541  @param stdin the stdin object
542  @returns self"""
543  return self.inp(stdin,string=False)
544  def __gt__(self,stdout):
545  """!Connects the given object to stdout, truncating it if it is
546  a file. Same as out(stdout,append=False).
547  @param stdout the stdout object
548  @returns self"""
549  return self.out(stdout,append=False)
550  def __lshift__(self,stdin):
551  """!Sends the specified string into stdin. Same as
552  inp(stdin,string=True).
553  @param stdin the stdin file
554  @returns self"""
555  return self.inp(stdin,string=True)
556  def __rshift__(self,stdout):
557  """!Appends stdout to the specified file. Same as
558  out(stdout,append=True).
559  @param stdout the stdout file
560  @returns self"""
561  return self.out(stdout,append=True)
562  def __pos__(self):
563  """!Sends stderr to stdout. Same as err2out().
564  @returns self"""
565  return self.err2out()
566  def __ge__(self,outerr):
567  """!Redirects stderr and stdout to the specified file,
568  truncating it. Same as err2out().out(filename,append=False)
569  @param outerr the stdout and stderr file
570  @returns self"""
571  return self.err2out().out(outerr,append=False)
572  def __or__(self,other):
573  """!Pipes this Runner to the other Runner. Same as pipeto(other).
574  @returns other
575  @param other the other runner to pipe into"""
576  return self.pipeto(other)
577 
578  def argins(self,index,arg):
579  """!Inserts the specified argument before the given index.
580 
581  This function is intended for internal use only. It is used
582  to implement threading on Cray, where arguments relating to
583  threading have to be added after the Runner is generated.
584 
585  @warning It is generally not safe to call this function
586  outside the produtil.mpi_impl subpackage since its modules may
587  generate completely different commands than you asked in order
588  to execute your requested programs.
589 
590  @param arg a string argument to add
591  @param index the index to insert before
592  @note Index 0 is the executable, while later indices are
593  arguments."""
594  self._args.insert(index,arg)
595  return self
596 
597  def args(self):
598  """!Iterates over the executable and arguments of this command"""
599  for arg in self._args:
600  yield arg
601 
602  def copy(self,typeobj=None):
603  """!Returns a deep copy of this object, almost.
604 
605  If stdin, stdout or stderr are connected to streams instead of
606  files or strings, then the streams are not copied. Instead,
607  the exact same stream objects are connected to the same unit
608  in the new Runner.
609  @param typeobj the type of the new object or None for Runner.
610  Do not set this unless you know what you're doing.
611  @returns the new object"""
612  if typeobj is None: typeobj=Runner
613  assert(typeobj is not None)
614  r=typeobj(list(self._args))
615  r._copy_env=self._copy_env
616  if self._stdin is not None: r._stdin =self._stdin .copy()
617  if self._stdout is not None: r._stdout=self._stdout.copy()
618  if self._stderr is not None: r._stderr=self._stderr.copy()
619  if self._env is not None: r._env=dict(self._env)
620  if self._prev is not None: r._prev=self._prev.copy()
621  if self._prerun is not None:
622  r._prerun=list()
623  for p in self._prerun:
624  r._prerun.append(p)
625  assert(r is not None)
626  assert(isinstance(r,typeobj))
627  return r
628 
629  def copyenv(self):
630  """!Instructs this command to duplicate the parent process
631  environment (the default).
632  @returns self"""
633  self._copy_env=True
634  return self
635 
636  def clearenv(self):
637  """!Instructs this command to start with an empty environment
638  except for certain critical variables without which most
639  programs cannot run. (Retains PATH, USER, LOGNAME and HOME.)
640  @returns self"""
641  self._copy_env=False
642  self._env={}
643  return self
644 
645  def _impl_make_env(self):
646  """!This internal function generates information about the
647  environment variables to be input to this process. If the
648  parent environment is to be passed unmodified, None is
649  returned. Otherwise, this routine returns dict of environment
650  variables calculated from os.environ and internal settings.
651  @returns the new environment dict"""
652 
653  if self._env is None and self._copy_env:
654  return None # copy parent process environment verbatim
655  env={}
656  if self._copy_env:
657  env=dict(os.environ)
658  else:
659  env={}
660  for key in ('PATH','USER','LOGNAME','HOME'):
661  if key in os.environ:
662  env[key]=os.environ[key]
663  if self._env is not None:
664  for key in self._env:
665  env[key]=self._env[key]
666  return env
667 
668  def env(self,**kwargs):
669  """!Sets environment variables for this Runner. The variables
670  should be specified as keyword arguments.
671  @param kwargs varname=value arguments
672  @returns self"""
673  if self._env is None:
674  self._env={}
675  for key in kwargs:
676  self._env[str(key)]=str(kwargs[key])
677  return self
678 
679  def to_shell(self):
680  """!Returns a string that expresses this object as a POSIX sh
681  shell command if possible, or raises a subclass of
682  NotValidPosixSh if not."""
683  if self._prev is not None:
684  s=self._prev.to_shell()+' | '
685  elif self._stdin is not None:
686  s=self._stdin.to_shell()
687  else:
688  s=''
689  if self._cd is not None:
690  s+="( set -e ; cd "+shbackslash(self._cd)+" ; exec "
691  if self._env is not None or not self._copy_env:
692  if(re.search('=',self._args[0])):
693  raise EqualInExecutable(
694  '%s: cannot have an "=" in the executable name when '
695  'modifying the environment.'%(self._args[0],))
696  s+='env'
697  if not self._copy_env:
698  s+=' -i'
699  for key in self._env:
700  if(re.search('=',key)):
701  raise EqualInEnv('%s: variable name contains an "="'
702  %(key,))
703  s+=' '+shbackslash("%s=%s"%(key,self._env[key]))
704  if not self._copy_env:
705  for key in ('PATH','USER','LOGNAME','HOME'):
706  if not key in self._env:
707  s+=' "%s=$%s"'%(key,key)
708  s+=' '
709  s+=' '.join([shbackslash(x) for x in self._args])
710  if self._stdout is not None: s+=' '+self._stdout.to_shell()
711  if self._stderr is not None: s+=' '+self._stderr.to_shell()
712  if self._cd is not None:
713  s+=" )"
714  return s
715  def runner(self):
716  """!Returns self if self is modifiable, otherwise returns a
717  modifiable copy of self. This is intended to be used to
718  implement unmodifiable subclasses of Runner
719  @returns self"""
720  return self
721  def pipeto(self,other):
722  """!Specifies that this Runner will send its stdout to the
723  other runner's stdin. This will raise MultipleStdout if this
724  Runner's stdout target is already specified, or MultipleStdin
725  if the other's stdin is already specified.
726  @param other the runner to pipe into
727  @returns other"""
728  if not isinstance(other,Runner):
729  raise InvalidPipeline(
730  'Attempting to pipe a Runner into something that is not '
731  'a Runner (likely a syntax error).')
732  if other._prev is not None:
733  raise MultipleStdin('Attempted to pipe more than one process '
734  'into stdin of the same process')
735  if self._stdout is not None:
736  raise MultipleStdout('More than one stdout is detected in '
737  'prog|prog')
738  if other._stdin is not None and self._stdin is not None:
739  raise MultipleStdin('More than one stdin is detected in '
740  'prog|prog')
741 
742  # Get a modifiable copy of the other Runner and pipe to it:
743  rother=other.runner()
744  rother._prev=self
745  if rother._stdin is not None:
746  self.inp(rother._stdin)
747  rother._stdin=None
748 
749  # Return the other object since it is later in the pipeline
750  # (this is needed for syntactic reasons):
751  return rother
752 
753  def inp(self,stdin,string=False):
754  """!Specifies that the first Runner in this pipeline takes
755  input from the given file or string specified by stdin. If
756  string=True, then stdin is converted to a string via str(),
757  otherwise it must be a filename or a stream. Raises
758  MultipleStdin if the stdin source is already specified.
759  @param stdin the input file or string
760  @param string if True, stdin is a string. Otherwise, it is a file.
761  @returns self"""
762  if self._prev is not None:
763  self._prev.inp(stdin,string)
764  return self
765  # ---- to get past here, we have to be the beginning of the pipeline ----
766  if self._stdin is not None:
767  raise MultipleStdin('More than one stdin detected in Runner.inp')
768  if isinstance(stdin,StringInput) or isinstance(stdin,FileOpener) or\
769  isinstance(stdin,StreamReuser):
770  self._stdin=stdin
771  elif(string):
772  self._stdin=StringInput(str(stdin))
773  else:
774  if isinstance(stdin,basestring):
775  self._stdin=FileOpener(str(stdin),'rb')
776  else:
777  self._stdin=StreamReuser(stdin)
778  return self
779 
780  def out(self,stdout,append=False):
781  """!Specifies that this process sends output from its stdout
782  stream to the given file or stream. The stdout object must be
783  a string filename, or a stream. If append=False, and the
784  stdout is a filename, the file will be truncated, if
785  append=True then it is appended. Raises MultipleStdout if the
786  stdout location is already specified
787  @param stdout the stdout file
788  @param append if True, append to the file, otherwise truncate
789  @returns self"""
790  if self._stdout is not None:
791  raise MultipleStdout('More than one stdout detected in call '
792  'to Runner.out')
793  if isinstance(stdout,basestring):
794  if append:
795  self._stdout=FileOpener(str(stdout),'ab')
796  else:
797  self._stdout=FileOpener(str(stdout),'wb')
798  else:
799  self._stdout=StreamReuser(stdout)
800  return self
801 
802  def err2out(self):
803  """!Sends stderr to stdout
804  @returns self"""
805  if self._stderr is not None:
806  raise MultipleStderr(
807  'More than one stderr detected in call to Runner.err')
808  self._stderr=OutIsError()
809  return self
810 
811  def err(self,stderr,append=False):
812  """!Specifies that this process sends output from its stderr
813  stream to the given file or stream. The stderr object must be
814  a string filename, or a stream. If append=False, and the
815  stderr is a filename, the file will be truncated, if
816  append=True then it is appended. Raises MultipleStderr if the
817  stderr location is already specified.
818  @param stderr the stderr output file
819  @param append if True, append to the file otherwise truncate
820  @returns self"""
821  if self._stderr is not None:
822  raise MultipleStderr(
823  'More than one stderr detected in call to Runner.err')
824  if isinstance(stderr,basestring):
825  if append:
826  self._stderr=FileOpener(str(stderr),'ab',True)
827  else:
828  self._stderr=FileOpener(str(stderr),'wb',True)
829  else:
830  self._stderr=StreamReuser(stderr)
831  return self
832 
833  def _gen(self,pipeline,logger=None,next=None):
834  """!Populates a Pipeline object with information from this
835  Runner. This is a recursive function that starts at the last
836  element of the pipeline (output element) and walks back to the
837  first (input element). The "next" parameter points to the
838  next (output-direction) element of the pipeline. The optional
839  logger parameter is where to send log messages.
840  @param[out] pipeline the produtil.pipeline.Pipeline
841  @param logger a logging.Logger for log messages
842  @param next the next Runner in the pipeline"""
843 
844  if self._prev is not None:
845  self._prev._gen(pipeline,logger=logger,next=self)
846  elif logger is not None:
847  logger.debug('GEN %s: recurse to %s'%(
848  repr(self),repr(self._prev)))
849  if self._prerun is not None:
850  for prerun in self._prerun:
851  prerun(self,logger=logger)
852  if logger is not None:
853  logger.debug('GEN %s: gen to %s with cmd=%s'%(
854  repr(self),repr(pipeline),str(self._args[0])))
855 
856  kwargs={}
857 
858  if self._stdin is not None:
859  (string,stream,send,close)=self._stdin._gen_stream()
860  if string is not None: kwargs['instring']=string
861  if stream is not None: kwargs['stdin']=stream
862  if send is not None: kwargs['sendin']=send
863  if close is not None: kwargs['closein']=close
864  if self._stdout is not None:
865  (string,stream,send,close)=self._stdout._gen_stream()
866  assert(string is None)
867  if stream is not None: kwargs['stdout']=stream
868  if send is not None: kwargs['sendout']=send
869  if close is not None: kwargs['closeout']=close
870  if self._stderr is not None:
871  (string,stream,send,close)=self._stderr._gen_stream()
872  assert(string is None)
873  if stream is not None: kwargs['stderr']=stream
874  if send is not None: kwargs['senderr']=send
875  if close is not None: kwargs['closeerr']=close
876  if self._env is not None:
877  kwargs['env']=self._impl_make_env()
878  if logger is not None:
879  kwargs['logger']=logger
880  pipeline._impl_add(self._args,(next is None),cd=self._cd,**kwargs)
881 
882 ########################################################################
883 
885  """!An copy-on-write version of Runner.
886 
887  This subclass of Runner is unmodifiable. It is meant to be used
888  for re-usable exe()-like objects. For example, if one wants an
889  object lsl that runs exe('ls')['-l'] with optional extra
890  arguments, one could do:
891 
892  lsl=ImmutableRunner(Runner('ls')['-l'])
893 
894  and then every time one does run(lsl[argument list]), it generates
895  a new object without modifying the original lsl, ensuring later
896  calls to lsl will have the same effect:
897 
898  lsl['/']
899  lsl['~']
900  lsl['/'] # prints the same as the first
901 
902  This is implemented by a copy-on-write method: if a modification
903  is requested, a Runner is returned with the requested
904  modifications."""
905  def __init__(self,args,**kwargs):
906  """!Creates a new ImmutableRunner. All arguments to this
907  constructor have the same meanings as the Runner
908  constructor.
909  @param args,kwargs passed to Runner.__init__"""
910  try:
911  self._init=True
912  Runner.__init__(self,args,**kwargs)
913  if self._prev is not None:
914  self._prev=ImmutableRunner(self._prev)
915  finally:
916  self._init=False
917 
918  def copy(self,typeobj=None):
919  """!Creates a deep copy of this runner, except if stream
920  objects are connected to stdin, stdout or stderr. In that
921  case, those same stream objects are still connected.
922  @param typeobj the type of the output object. Do not use
923  this unless you know what you're doing
924  @returns a copy of self"""
925  if typeobj is None: typeobj=ImmutableRunner
926  return Runner.copy(self,typeobj)
927 
928  def runner(self):
929  """!Returns a modifiable version of this object (as a Runner)."""
930  return self.copy(Runner)
931  def _init_runner(self):
932  """!Do not call this function: it is an internal implementation
933  function. It returns self if self.__init__ is still being
934  run, otherwise it returns self.runner()."""
935  if self._init:
936  x=self
937  else:
938  x=self.runner()
939  assert(x is not None)
940  assert(isinstance(x,Runner))
941  return x
942  def copyenv(self):
943  """!Creates a new Runner that is like self in all ways except
944  that it uses the parent process environment.
945  @returns the new Runner"""
946  return self._init_runner().copyenv()
947  def clearenv(self):
948  """!Creates a new Runner which is like self in all ways except
949  that it uses an empty environment except for a few critical
950  variables without which most programs cannot run. (Retains
951  PATH, USER, LOGNAME and HOME.)
952  @returns a new Runner"""
953  return self._init_runner().clearenv()
954  def cd(self,cd):
955  """!Returns a new Runner that is like self, except that it
956  cd's to the target directory before running. The directory
957  must already exist before the program starts.
958  @param cd the directory to cd into, which must already exist.
959  @returns the new Runner"""
960  return self._init_runner().cd(cd)
961  def env(self,**kwargs):
962  """!Returns a new Runner that is like self in all ways except
963  that the specified environment variables are set.
964  @param kwargs varname=value arguments of environment variables to set
965  @returns the new Runner"""
966  return self._init_runner().env(**kwargs)
967  def pipeto(self,other):
968  """!Returns a new Runner that is like self in all ways, except
969  that it has been piped into the other Runner.
970  @returns the new Runner
971  @param other the Runner to pipe into."""
972  return self.runner().pipeto(other)
973  def inp(self,stdin,string=False):
974  """!Returns a new Runner that is like self in all ways except
975  that it has a different stdin
976  @param stdin the stdin string or filename
977  @param string if True, stdin is a string"""
978  return self._init_runner().inp(stdin,string)
979  def out(self,stdout,append=False):
980  """!Returns a new Runner that is like self in all ways except
981  with a different stdout.
982  @param stdout the stdout filename
983  @param append if True, append to the file, otherwise truncate"""
984  return self._init_runner().out(stdout,append)
985  def err(self,stderr,append=False):
986  """!Returns a new Runner that is like self in all ways except
987  with a different stderr.
988  @param stderr the stderr filename
989  @param append if True, append to the file, otherwise truncate"""
990  return self._init_runner().err(stderr,append)
991  def err2out(self):
992  """!Returns a new Runner that is like self in all ways except
993  that stderr is piped into stdout."""
994  return self._init_runner().err2out()
995  def prerun(self,arg):
996  """!Returns a new Runner that is like self in all ways except
997  that a new prerun function has been added.
998  @param arg the new prerun function
999  @sa Runner.prerun()"""
1000  return self._init_runner().prerun(arg)
1001  def __getitem__(self,args):
1002  """!Returns a new Runner that is like self in all ways except
1003  with new arguments.
1004  @param args the new argument or arguments
1005  @sa Runner.__getitem__"""
1006  return Runner.__getitem__(self._init_runner(),args)
1007  def argins(self,index,arg):
1008  """!Returns a new Runner that is like self in all ways, except
1009  with the specified argument inserted.
1010  @param index the index to insert before
1011  @param arg the argument to insert"""
1012  return self._init_runner().argins(index,arg)
1013 
1014  def _gen(self,pipeline,logger=None,next=None):
1015  """!Creates a Runner object that is a duplicate of this
1016  ImmutableRunner, and calls its _gen function.
1017  @param pipeline the produtil.pipeline.Pipeline to generate
1018  @param logger a logging.Logger for log messages
1019  @param next the next Runner in the chain
1020  @sa Runner._gen()"""
1021  return self.runner()._gen(pipeline,logger,next)
1022 
1023  def setthreads(self,nthreads):
1024  """!Sets the number of threads requested by this program."""
1025  r=self._init_runner()
1026  r.threads=nthreads
1027  return r
1028  def delthreads(self):
1029  """!Removes the request for threads. Same as self.threads=1"""
1030  r=self._init_runner()
1031  del r.threads
1032  return r
def __init__(self, obj)
Creates a StringInput that sends the specified object to stdin.
Definition: prog.py:207
def isplainexe(self)
Returns true if this is simply an executable with arguments (no redirection, no prerun objects...
Definition: prog.py:524
def clearenv(self)
Instructs this command to start with an empty environment except for certain critical variables witho...
Definition: prog.py:636
Raised when converting a Runner or pipeline of Runners to a POSIX sh string.
Definition: prog.py:73
def repr_for_in(self)
Part of the implementation of Runner.__repr__, this returns the filename and ",string=False".
Definition: prog.py:192
def out
Specifies that this process sends output from its stdout stream to the given file or stream...
Definition: prog.py:780
def for_output(self)
Has no effect.
Definition: prog.py:125
def copy(self)
Creates a shallow copy of this object.
Definition: prog.py:157
def __getitem__(self, args)
Add one or more arguments to the executable.
Definition: prog.py:453
def copy(self)
Returns a shallow copy of this object.
Definition: prog.py:249
Sets up signal handlers to ensure a clean exit.
Definition: sigsafety.py:1
def __init__(self)
OutIsError constructor.
Definition: prog.py:273
def err2out(self)
Sends stderr to stdout.
Definition: prog.py:802
This is part of the internal implementation of Runner, used to convert it to a produtil.pipeline.Pipeline for execution.
Definition: prog.py:134
def pipeto(self, other)
Returns a new Runner that is like self in all ways, except that it has been piped into the other Runn...
Definition: prog.py:967
def repr_for_in(self)
Part of the implementation of Runner.__repr__.
Definition: prog.py:230
def __repr__(self)
Returns a string representation of this object as valid Python code.
Definition: prog.py:188
Raised when the caller specifies more than one source for the stdin of a Runner.
Definition: prog.py:52
def err
Specifies that this process sends output from its stderr stream to the given file or stream...
Definition: prog.py:811
Raised when one tries to specify the stdout, stderr or stdin to go to, or come from, more than one location.
Definition: prog.py:49
def first(self)
Returns the first Runner in this pipeline.
Definition: prog.py:413
def env(self, kwargs)
Sets environment variables for this Runner.
Definition: prog.py:668
def inp
Returns a new Runner that is like self in all ways except that it has a different stdin...
Definition: prog.py:973
def __or__(self, other)
Pipes this Runner to the other Runner.
Definition: prog.py:572
def repr_for_err(self)
Same as repr_for_out.
Definition: prog.py:201
Raised when the caller specifies an invalid input or output when piping a Runner into or out of anoth...
Definition: prog.py:61
def getthreads(self)
Returns the number of threads requested by this program.
Definition: prog.py:399
def out
Returns a new Runner that is like self in all ways except with a different stdout.
Definition: prog.py:979
def argins(self, index, arg)
Returns a new Runner that is like self in all ways, except with the specified argument inserted...
Definition: prog.py:1007
def copyenv(self)
Instructs this command to duplicate the parent process environment (the default). ...
Definition: prog.py:629
def intmode(self)
Returns an integer version of mode suitable for os.open.
Definition: prog.py:172
def to_shell(self)
Raises NotValidPosixSh to indicate that the stream cannot be represented as POSIX sh...
Definition: prog.py:253
def shstrok(s)
Returns True if the specified string can be expressed as a POSIX sh string, and false otherwise...
Definition: prog.py:95
Base class of exceptions that are raised when converting a Runner or pipeline of Runners to a POSIX s...
Definition: prog.py:65
def __init__(self, obj)
Creates a StreamReuser for the specified stream-like object.
Definition: prog.py:243
def to_shell(self)
Returns "2>&1".
Definition: prog.py:278
def _init_runner(self)
Do not call this function: it is an internal implementation function.
Definition: prog.py:931
def prerun(self, arg)
Adds a function or callable object to be called before running the program.
Definition: prog.py:419
def __repr__(self)
Returns a string representation of this object as valid Python code.
Definition: prog.py:222
def _impl_make_env(self)
This internal function generates information about the environment variables to be input to this proc...
Definition: prog.py:645
def __lshift__(self, stdin)
Sends the specified string into stdin.
Definition: prog.py:550
def __repr__(self)
Attempts to produce valid Python code to represent this Runnable.
Definition: prog.py:473
Raised when converting a Runner or pipeline of Runners to a POSIX sh string if there is an equal ("="...
Definition: prog.py:81
Raised when the caller specifies more than one destination for a Runner's stderr. ...
Definition: prog.py:58
def err
Returns a new Runner that is like self in all ways except with a different stderr.
Definition: prog.py:985
Converts a group of MPI ranks to a runnable command.
Definition: __init__.py:1
def to_shell(self)
Creates a POSIX sh representation of the part of the command that requests redirection.
Definition: prog.py:160
Base class of exceptions raised when a Runner is given arguments that make no sense.
Definition: prog.py:46
def __lt__(self, stdin)
Connects the given object to stdin, via inp(stdin,string=False).
Definition: prog.py:539
def to_shell(self)
Converts this object, if possible, to an echo command followed by a pipe ("|").
Definition: prog.py:226
def repr_for_out(self)
Part of the implementation of Runner.__repr__, this returns the filename and ",string=False".
Definition: prog.py:196
mode
how the file is being opened
Definition: prog.py:146
def copy
Creates a deep copy of this runner, except if stream objects are connected to stdin, stdout or stderr.
Definition: prog.py:918
Raised when trying to convert a pipeline of Runners to a POSIX sh string, if a redirection in the pip...
Definition: prog.py:69
def delthreads(self)
Removes the request for threads.
Definition: prog.py:406
This is part of the internal implementation of Runner, and is used to convert it to a produtil...
Definition: prog.py:117
def runner(self)
Returns self if self is modifiable, otherwise returns a modifiable copy of self.
Definition: prog.py:715
def copy
Returns a deep copy of this object, almost.
Definition: prog.py:602
def repr_for_err(self)
Returns the stderr value.
Definition: prog.py:128
def clearenv(self)
Creates a new Runner which is like self in all ways except that it uses an empty environment except f...
Definition: prog.py:947
def setthreads(self, nthreads)
Sets the number of threads requested by this program.
Definition: prog.py:1023
def err2out(self)
Returns a new Runner that is like self in all ways except that stderr is piped into stdout...
Definition: prog.py:991
def copy(self)
Returns a shallow copy of this object.
Definition: prog.py:215
def for_input(self)
Has no effect.
Definition: prog.py:122
def _stringify_arg(self, arg)
Returns a string representation of the given argument to convert it to an input to produtil...
Definition: prog.py:435
def __ge__(self, outerr)
Redirects stderr and stdout to the specified file, truncating it.
Definition: prog.py:566
def __gt__(self, stdout)
Connects the given object to stdout, truncating it if it is a file.
Definition: prog.py:544
def setthreads(self, nthreads)
Sets the number of threads requested by this program.
Definition: prog.py:402
Represents sending a string to a process's stdin.
Definition: prog.py:205
def delthreads(self)
Removes the request for threads.
Definition: prog.py:1028
def repr_for_in(self)
This should never be called.
Definition: prog.py:284
def prerun(self, arg)
Returns a new Runner that is like self in all ways except that a new prerun function has been added...
Definition: prog.py:995
obj
the stream-like object to reuse.
Definition: prog.py:246
def __pos__(self)
Sends stderr to stdout.
Definition: prog.py:562
def __eq__(self, other)
Is the other object an OutIsError?
Definition: prog.py:292
def shbackslash(s)
Given a Python str, returns a backslashed POSIX sh string, or raises NotValidPosixShString if that ca...
Definition: prog.py:105
def runner(self)
Returns a modifiable version of this object (as a Runner).
Definition: prog.py:928
def to_shell(self)
Returns a string that expresses this object as a POSIX sh shell command if possible, or raises a subclass of NotValidPosixSh if not.
Definition: prog.py:679
def __init__(self, args, kwargs)
Creates a new Runner.
Definition: prog.py:311
err
If True, this is for stderr.
Definition: prog.py:147
def __eq__(self, other)
Returns True if the other object is a Runner that is equal to this one, and False otherwise...
Definition: prog.py:509
def inp
Specifies that the first Runner in this pipeline takes input from the given file or string specified ...
Definition: prog.py:753
def cd(self, dirpath)
Requests that this process run in the specified directory.
Definition: prog.py:532
def env(self, kwargs)
Returns a new Runner that is like self in all ways except that the specified environment variables ar...
Definition: prog.py:961
def shvarok(s)
Returns True if the specified environment variable name is a valid POSIX sh variable name...
Definition: prog.py:86
def __getitem__(self, args)
Returns a new Runner that is like self in all ways except with new arguments.
Definition: prog.py:1001
def pipeto(self, other)
Specifies that this Runner will send its stdout to the other runner's stdin.
Definition: prog.py:721
obj
the object to send to stdin
Definition: prog.py:211
def __str__(self)
Alias for repr()
Definition: prog.py:470
Represents a single stage of a pipeline to execute.
Definition: prog.py:299
Raised when the caller specifies more than one destination for a Runner's stdout. ...
Definition: prog.py:55
def repr_for_in(self)
Returns repr(obj) where obj is the given stream-like object.
Definition: prog.py:262
Internal module that launches and monitors processes.
Definition: pipeline.py:1
def __init__(self, args, kwargs)
Creates a new ImmutableRunner.
Definition: prog.py:905
An copy-on-write version of Runner.
Definition: prog.py:884
def args(self)
Iterates over the executable and arguments of this command.
Definition: prog.py:597
def repr_for_out(self)
Part of the representation of Runner.__repr__.
Definition: prog.py:287
def copy(self)
Returns a new OutIsError object.
Definition: prog.py:275
Raised when converting a Runner or pipeline of Runners to a posix sh string if a Runner's executable ...
Definition: prog.py:77
filename
the name of the file being opened
Definition: prog.py:145
def copyenv(self)
Creates a new Runner that is like self in all ways except that it uses the parent process environment...
Definition: prog.py:942
def cd(self, cd)
Returns a new Runner that is like self, except that it cd's to the target directory before running...
Definition: prog.py:954
def __rshift__(self, stdout)
Appends stdout to the specified file.
Definition: prog.py:556
def __init__
FileOpener constructor.
Definition: prog.py:140
Instructs a Runner to send stderr to stdout.
Definition: prog.py:271
def repr_for_out(self)
Returns repr(obj) where obj is the given stream-like object.
Definition: prog.py:266
Arranges for a stream-like object to be sent to the stdout, stderr or stdin of a Runner.
Definition: prog.py:240
def argins(self, index, arg)
Inserts the specified argument before the given index.
Definition: prog.py:578