HWRF  trunk@4391
pipeline.py
1 """!Internal module that launches and monitors processes.
2 
3 Do not use this module directly: it is part of the internal
4 implementation of the produtil.prog and produtil.run modules. It
5 converts a produtil.prog.Runner object to processes, and monitors the
6 processes until they exit, sending and receiving data as needed. This
7 replaces the built-in "subprocess" module which is not capable of
8 general-purpose pipeline execution."""
9 
10 ##@var __all__
11 # List of symbols exported by "from produtil.pipeline import *"
12 __all__ = [ "launch", "manage", "PIPE", "ERR2OUT", "kill_all",
13  "kill_for_thread" ]
14 
15 class NoMoreProcesses(KeyboardInterrupt):
16  """!Raised when the produtil.sigsafety package catches a fatal
17  signal. Indicates to callers that the thread should exit."""
18 
19 import os, signal, select, logging, sys, StringIO, time, errno, \
20  fcntl, threading, weakref, collections
21 import stat,errno,fcntl
22 
23 class Constant(object):
24  """!A class used to implement named constants."""
25  def __init__(self,s,r=None):
26  """!Creates a named constant
27  @param s the return value of __str__() = str(self)
28  @param r the return value of __repr__() = repr(self)"""
29  self.__s=s
30  if r is None:
31  r='Constant(%s)@0x%x'%(repr(s),id(self))
32  self.__r=r
33  def __str__(self):
34  """!Returns the s argument to the constructor."""
35  return self.__s
36  def __repr__(self):
37  """!Returns the r argument of the constructor."""
38  return self.__r
39 
40 ##@var plock
41 # A global lock for this module.
42 plock=threading.Lock()
43 
44 ##@var pipes_to_close
45 # Set of pipes that must be closed after forking to avoid deadlocks.
46 pipes_to_close=set()
47 
48 ##@var PIPE
49 # Indicates that stdout, stdin or stderr should be a pipe.
50 PIPE=Constant('PIPE')
51 
52 ##@var ERR2OUT
53 # Request that stderr and stdout be the same stream.
54 ERR2OUT=Constant('ERR2OUT')
55 def unblock(stream,logger=None):
56  """!Attempts to modify the given stream to be non-blocking. This
57  only works with streams that have an underlying POSIX fileno, such
58  as those from open.
59 
60  Will re-raise any exception received, other than AttributeError
61  and EnvironmentError. Hence, I/O errors and attempts to make a
62  non-fileno stream non-blocking will produce a False return value,
63  while anything else will raise an exception.
64 
65  @param stream the stream to unblock
66  @param logger a logging.Logger for log messages
67  @returns True on success, False otherwise."""
68  call_fcntrl(os.O_NONBLOCK,0,logger)
69 
70 def call_fcntrl(stream,on,off,logger=None):
71  """!Internal function that implements unblock()
72  @param stream the stream to modify
73  @param on flags to turn on
74  @param off flags to turn off
75  @param logger a logging.Logger for messages
76  @returns True on success, False otherwise."""
77  try:
78  if isinstance(stream,int):
79  fd=stream
80  else:
81  fd=stream.fileno()
82  except (AttributeError,EnvironmentError) as ee:
83  if logger is not None:
84  logger.warning('%s: stream has no fileno, cannot switch to '
85  'non-blocking I/O: %s'%
86  (repr(stream),str(ee)),exc_info=True)
87  return False
88 
89  try:
90  flags=fcntl.fcntl(fd, fcntl.F_GETFL)
91  fcntl.fcntl(fd, fcntl.F_SETFL, (flags|on) & ~off)
92  return True
93  except EnvironmentError as ee:
94  if logger is not None:
95  logger.error('%s: cannot switch to non-blocking I/O: %s'%
96  (repr(stream),str(ee)),exc_info=True)
97  return False
98 
99 def pipe(logger=None):
100  """!Creates a pipe that will be closed on exec. Except that it
101  does not seem to be reliably closed on exec, so there are other
102  workarounds in this module.
103  @param logger a logging.Logger for log messages"""
104  with plock:
105  (p1,p2)=os.pipe()
106  call_fcntrl(p1,fcntl.FD_CLOEXEC,0,logger)
107  call_fcntrl(p2,fcntl.FD_CLOEXEC,0,logger)
108  return (p1,p2)
109 
110 def padd(p):
111  """!Adds a file descriptor to the list to close before exec.
112  @param p the file descriptor"""
113  with plock:
114  pipes_to_close.add(p)
115 
116 def pclose(i):
117  """!Closes a file descriptor, removing it from the list that must
118  be closed on exec.
119  @param i the file descriptor"""
120  with plock:
121  try:
122  os.close(i)
123  except EnvironmentError as e: pass
124  if i in pipes_to_close:
125  pipes_to_close.remove(i)
126 
127 def pclose_all(i=None,o=None,e=None,logger=None):
128  """!Closes all file descriptors sent to padd.
129  @param i my stdin, which should not be closed
130  @param o my stdout, which should not be closed
131  @param e my stderr, which should not be closed
132  @param logger a logging.Logger for debug messages"""
133  with plock:
134  for p in pipes_to_close:
135  if p!=i and p!=o and p!=e:
136  if logger is not None:
137  logger.debug("In child, close old pipe fd %d"%p)
138  os.close(p)
139  pipes_to_close.clear()
140 
141 def launch(cmd, env=None, stdin=None, stdout=None, stderr=None,
142  debug=False, cd=None):
143  """!Starts the specified command (a list), with the specified
144  environment (or None to copy this process's environment).
145 
146  @param stdin,stdout,stderr Specifies the stdin, stdout and stderr
147  streams. The special value PIPE means "make a pipe," and sending
148  stderr=ERR2OUT requests redirection of stderr to stdout.
149  @param cd The optional "cd" argument specifies a directory to cd
150  into, in the child process, before executing the command. Of
151  course, you shouldn't care about any of this because you should be
152  using the produtil.run package.
153  @param cmd the command to run
154  @param env the subprocess's environment, or None to use mine
155  @param debug if True, send debug messages"""
156 
157  if cd is not None and not isinstance(cd,basestring):
158  raise TypeError(
159  "In produtil.pipeline.launch, cd must be a string or None")
160  if cd=='':
161  raise ValueError(
162  "In produtil.pipeline.launch, cd must not be the empty string.")
163 
164  stdinP=None ; stdinC=None
165  stdoutP=None ; stdoutC=None
166  stderrP=None ; stderrC=None
167  logger=logging.getLogger(cmd[0])
168  global pipes_to_close
169  if debug:
170  logger.debug("Start %s"%(repr(cmd),))
171 
172  if stdin is PIPE:
173  (stdinC,stdinP)=pipe(logger)
174  if debug:
175  logger.debug("Pipe for stdin: %d<==%d"%(stdinC,stdinP))
176  else:
177  stdinC=stdin
178  if stdout is PIPE:
179  (stdoutP,stdoutC)=pipe(logger)
180  if debug:
181  logger.debug("Pipe for stdout: %d<==%d"%(stdoutP,stdoutC))
182  else:
183  stdoutC=stdout
184  if stderr is PIPE:
185  (stderrP,stderrC)=pipe(logger)
186  if debug:
187  logger.debug("Pipe for stderr: %d<==%d"%(stderrP,stderrC))
188  elif stderr is not ERR2OUT:
189  stderrC=stderr
190 
191  pid=os.fork()
192  assert(pid>=0)
193  if pid>0:
194  # Parent process after successfull fork.
195  if stdin is not None and stdin is not PIPE:
196  if debug:
197  logger.debug("Close stdin %d on parent."%stdin)
198  pclose(stdin)
199  if stdin is PIPE and stdinC is not None:
200  if debug:
201  logger.debug("Close stdinC %d on parent."%stdinC)
202  pclose(stdinC)
203  padd(stdinP)
204  if stdout is not None and stdout is not PIPE:
205  if debug:
206  logger.debug("Close stdout %d on parent."%stdout)
207  pclose(stdout)
208  if stdout is PIPE and stdoutC is not None:
209  if debug:
210  logger.debug("Close stdoutC %d on parent."%stdoutC)
211  pclose(stdoutC)
212  padd(stdoutP)
213  if stderr is not None and stderr is not PIPE and stderr is not ERR2OUT:
214  if debug:
215  logger.debug("Close stderr %d on parent."%stderr)
216  pclose(stderr)
217  if stderr is PIPE and stderrC is not None:
218  if debug:
219  logger.debug("Close stderrC %d on parent."%stderrC)
220  pclose(stderrC)
221  padd(stderrP)
222  if debug:
223  logger.debug("On parent, returning %s"%(
224  repr((pid, stdinP,stdoutP,stderrP))))
225  return (pid, stdinP,stdoutP,stderrP)
226 
227  if isinstance(cd,basestring):
228  os.chdir(cd)
229 
230  # We are in the child process
231  pclose_all(i=stdin,o=stdout,e=stderr)
232 
233  if stdinP is not None:
234  if debug:
235  logger.debug("Close stdinP %d on child."%stdinP)
236  pclose(stdinP)
237  if stdinC is not None:
238  if debug:
239  logger.debug("Point stdin to stdinC %d on child and close original."%stdinC)
240  os.dup2(stdinC,0)
241  pclose(stdinC)
242 
243  if stdoutP is not None:
244  if debug:
245  logger.debug("Close stdoutP %d on child."%stdoutP)
246  pclose(stdoutP)
247  if stdoutC is not None:
248  if debug:
249  logger.debug("Point stdout to stdoutC %d on child and close original."%stdoutC)
250  os.dup2(stdoutC,1)
251  pclose(stdoutC)
252 
253  if stderr is ERR2OUT:
254  if debug:
255  logger.debug("Redirect stderr to stdout on child.")
256  os.dup2(1,2)
257  if stderrP is not None:
258  if debug:
259  logger.debug("Close stderrP %d on child."%stderrP)
260  pclose(stderrP)
261  if stderrC is not None:
262  if debug:
263  logger.debug("Point stderr to stderrC %d on child and close original."%stderrC)
264  os.dup2(stderrC,2)
265  pclose(stderrC)
266 
267  if debug:
268  logger.debug("Reset signal handlers on child.")
269 
270  signal.signal(signal.SIGHUP,signal.SIG_DFL)
271  signal.signal(signal.SIGTERM,signal.SIG_DFL)
272  signal.signal(signal.SIGINT,signal.SIG_DFL)
273  signal.signal(signal.SIGQUIT,signal.SIG_DFL)
274  signal.signal(signal.SIGPIPE,signal.SIG_DFL)
275  signal.signal(signal.SIGCHLD,signal.SIG_DFL)
276 
277  assert(cmd[0])
278 
279  try:
280  if debug:
281  logger.debug("Run %s %s on child"%(cmd[0], " ".join(cmd[1:])))
282  if env is None:
283  os.execvp(cmd[0],cmd)
284  else:
285  os.execvpe(cmd[0],cmd,env)
286  except Exception as e:
287  logger.error("%s: could not exec: %s"%(cmd[0],str(e)),exc_info=True)
288  sys.exit(2)
289 
290 def filenoify(f):
291  """!Tries to convert f to a fileno
292  @returns an integer UNIX file descriptor
293  @param f ERR2OUT, PIPE, an integer fileno or a file-like object
294  with a fileno() function."""
295  if f is ERR2OUT or f is PIPE or f is None: return f
296  if isinstance(f,int): return f
297  return f.fileno()
298 
299 ########################################################################
300 # Auto-killing processes
301 _manage_set=collections.defaultdict(set)
302 
303 ##@var _kill_all
304 # Should all processes be killed? Used by the produtil.sigsafety module.
305 _kill_all=None
306 
308  """!Sends a TERM signal to all processes that the specified thread
309  (a threading.Thread) is waiting for."""
310  tht=weakref.ref(th)
311  killme=set(_manage_set[tht])
312  for p in killme:
313  try:
314  os.kill(p,signal.SIGTERM)
315  except EnvironmentError as e:
316  pass
317  try:
318  _manage_set[th].remove(killme)
319  except (ValueError,KeyError,TypeError) as e:
320  pass
321 
322 def kill_all():
323  """!Sends a TERM signal to all processes that this module is
324  managing"""
325  _kill_all=True
326 
327 ########################################################################
328 
329 def manage(proclist,inf=None,outf=None,errf=None,instr=None,logger=None,
330  childset=None,sleeptime=None):
331  """!Watches a list of processes, handles their I/O, returns when
332  all processes have exited and all I/O is complete.
333 
334  @warning You should not be calling this function unless you are
335  modifying the implementation of Pipeline. Use the produtil.run
336  module instead of calling launch() and manage().
337 
338  @param proclist the list of processes to watch
339  @param inf the input file
340  @param outf the output file
341  @param errf the error file
342  @param instr the input string, instead of an input file
343  @param childset the set of child process ids
344  @param sleeptime sleep time between checks of child processes
345  @param logger Logs to the specified object, at level DEBUG, if a logger is
346  specified.
347  @returns a tuple containing the stdout string (or None), the
348  stderr string (or None) and a dict mapping from process id to the
349  return value from os.wait4 called on that process."""
350 
351  me=weakref.ref(threading.current_thread())
352  ms=_manage_set[me]
353  assert(proclist)
354  ms.update(proclist)
355  assert(ms)
356 
357  bufsize=1048576
358  work=list()
359  done=dict() # mapping from pid to wait4 return value
360  outio=None
361  errio=None
362  haveio=False
363 
364  inf=filenoify(inf)
365  outf=filenoify(outf)
366  errf=filenoify(errf)
367 
368  if inf is not None:
369  if instr is None:
370  instr=""
371  if logger is not None:
372  logger.debug("Will write instr (%d bytes) to %d."
373  %(len(instr),inf))
374  work.append([0,inf])
375  unblock(inf,logger=logger)
376  haveio=True
377 
378  if outf is not None:
379  if logger is not None:
380  logger.debug("Will read outstr from %d."%outf)
381  work.append([1,outf])
382  outio=StringIO.StringIO()
383  unblock(outf,logger=logger)
384  haveio=True
385 
386  if errf is not None:
387  if logger is not None:
388  logger.debug("Will read errstr from %d."%errf)
389  work.append([1,errf])
390  errio=StringIO.StringIO()
391  unblock(errf,logger=logger)
392  haveio=True
393 
394  for proc in proclist:
395  if logger is not None:
396  logger.debug("Monitor process %d."%proc)
397  work.append([2,proc])
398 
399  nin=0
400  lastproc=time.time()
401  forceclose=False
402  while work:
403  i=0
404  didproc=False
405  if _kill_all is not None:
406  if logger is not None:
407  logger.debug("Kill all processes.")
408  for (job,tgt) in work:
409  if job==2:
410  os.kill(tgt,SIGTERM)
411  while i<len(work):
412  (job,tgt)=work[i]
413  assert(job==0 or job==1 or job==2)
414  if job==0:
415  if logger is not None:
416  logger.debug("Attempt a write of %d bytes to %d"
417  %(len(instr)-nin,tgt))
418  try:
419  n=os.write(tgt,instr[nin:])
420  except EnvironmentError as e:
421  if e.errno==errno.EAGAIN or e.errno==errno.EWOULDBLOCK:
422  n=None
423  else:
424  raise
425  if n:
426  if logger is not None:
427  logger.debug("Wrote %d bytes to %d."%(n,tgt))
428  nin+=n
429  if nin>=len(instr):
430  if logger is not None:
431  logger.debug("Done writing all %d bytes; close %d."
432  %(nin,tgt))
433  pclose(tgt)
434  work.pop(i)
435  continue # do not increment i
436  if forceclose:
437  if logger is not None:
438  logger.debug("Force close of in %d due to timeout."
439  %tgt)
440  pclose(tgt)
441  work.pop(i)
442  continue # do not increment i
443  elif job==1:
444  try:
445  if logger is not None:
446  logger.debug("Attempt a read from %d"%tgt)
447  s=os.read(tgt,bufsize)
448  except EnvironmentError as e:
449  if e.errno==errno.EAGAIN or e.errno==errno.EWOULDBLOCK:
450  if logger is not None:
451  logger.debug("Error %s from %d - assume no data"
452  %(str(e),tgt))
453  s=None
454  else:
455  raise
456  if s=='':
457  # end of file
458  if logger is not None:
459  logger.debug("eof reading output %d"%tgt)
460  pclose(tgt)
461  work.pop(i)
462  continue # do not increment i
463  if s is not None:
464  if logger is not None:
465  logger.debug("Read %d bytes from output %d"
466  %(len(s),tgt))
467  # read something
468  outio.write(s)
469  if forceclose:
470  if logger is not None:
471  logger.debug("Force close of %d due to timeout."
472  %tgt)
473  pclose(tgt)
474  work.pop(i)
475  continue # do not increment i
476  elif job==2:
477  if logger is not None:
478  logger.debug("Check process %d"%tgt)
479  didproc=True
480  r=os.wait4(tgt,os.WNOHANG)
481  if r and ( r[0]!=0 or r[1]!=0 ):
482  if logger is not None:
483  logger.debug("Process %d exited"%tgt)
484  work.pop(i)
485  try:
486  ms.remove(tgt)
487  except (ValueError,KeyError,TypeError) as e:
488  if logger is not None:
489  logger.debug(
490  "Cannot remove pid %d from _manage_set: %s"
491  %(tgt,str(e)),exc_info=True)
492  if childset is not None:
493  try:
494  childset.remove(tgt)
495  except (ValueError,KeyError,TypeError) as e:
496  if logger is not None:
497  logger.debug(
498  "Cannot remove pid %d from childset: %s"
499  %(tgt,str(e)),exc_info=True)
500  done[tgt]=r
501  continue # do not increment i
502  else:
503  if logger is not None:
504  logger.debug("Process %d still running"%tgt)
505  i+=1
506  if didproc:
507  lastproc=time.time()
508  else:
509  now=time.time()
510  if now-lastproc > 2 and work:
511  if logger is not None:
512  logger.debug(
513  "No data two seconds after processes exited. "
514  "Forcing a close of all streams.")
515  # No data in past two seconds and all processes have
516  # exited.
517  forceclose=True
518  continue
519  if work:
520  if logger is not None:
521  logger.debug("Bottom of loop with work=%s"%repr(work))
522  if sleeptime:
523  time.sleep(sleeptime)
524  elif haveio:
525  time.sleep(0.01)
526  else:
527  time.sleep(0.2)
528  if logger is not None:
529  logger.debug("Done monitoring pipeline.")
530 
531  outstr=None
532  if outf is not None:
533  outstr=outio.getvalue()
534  outio.close()
535 
536  errstr=None
537  if errf is not None:
538  errstr=errio.getvalue()
539  errio.close()
540 
541  if _kill_all is not None:
542  raise NoMoreProcesses(
543  "Master thread caught a signal. This thread should exit.")
544  return (outstr, errstr, done)
545 
546 ########################################################################
547 
548 def simple_run(cmd, env=None, stdin=None, stdout=None, stderr=None,
549  debug=False, cd=None, logger=None):
550  (pid, stdinP, stdoutP, stderrP) = launch(
551  cmd,env,stdin,stdout,stderr,debug,cd)
552  (outstder, errstr, done) = \
553  manage([pid], inf=stdinP, outf=stdoutP, errf=stderrP, logger=logger)
554  result=done[pid][1]
555  if os.WIFEXITED(result):
556  return os.WEXITSTATUS(result)
557  elif os.WIFSIGNALED(result):
558  return -os.WTERMSIG(result)
559  else:
560  return -128
561 
562 ########################################################################
563 
564 class Pipeline(object):
565  """!This class is a wrapper around launch and manage. It converts
566  Runner objects to calls to "launch", and runs "manage" on the
567  resulting processes."""
568  def __init__(self,runner,capture=False,logger=None,debug=False):
569  """!Pipeline constructor
570  @param runner the produtil.prog.Runner to convert
571  @param capture if True, capture the stdout of the runner
572  @param logger a logging.Logger for messages
573  @param debug if True, send debug messages"""
574  self.__children=set()
575  self.__quads=list()
576  self.__capture=capture
577  self.__logger=logger
578  self.__debug=debug
579  self.__instring=None
580  self.__outstring=None
581  self.__errstring=None
582  self.__stdin=None
583  self.__stdout=None
584  self.__stderr=None
585  self.__managed=None
586  self.__last_pid=None
587  self.__lock=threading.Lock()
588  runner._gen(self,logger=logger)
589  def __repr__(self):
590  """!Return a debug string representation of this Pipeline."""
591  return "<Pipeline id=0x%x in=%s out=%s err=%s>"%(
592  id(self),
593  repr(self.__stdin),repr(self.__stdout),repr(self.__stderr))
594  def _impl_add(self,command,endpipe,logger=None,
595  instring=None,stdin=None,stdout=None,stderr=None,
596  sendout=None,senderr=None,sendin=None,env=None,
597  closein=None,closeout=None,closeerr=None,
598  cd=None):
599  """!Adds another produtil.prog.Runner's contents to this Pipeline.
600  @param command the command to run
601  @param endpipe is this the end of a pipeline?
602  @param logger a logging.Logger for log messages
603  @param instring the string for stdin
604  @param stdin a file, fileno or PIPE constant for stdin
605  @param stdout a file, fileno or special constant for stdout
606  @param stderr a file, fileno or special constant for stderr
607  @param env subprocess environment variables
608  @param closein,closeout,closeerr Unused.
609  @param cd subprocess working directory"""
610  pin = stdin if (sendin is None) else sendin
611  pout = stdout if (sendout is None) else sendout
612  perr = stderr if (senderr is None) else senderr
613 
614  if instring is not None:
615  pin=PIPE
616  self.__instring=instring
617  if pin is None and self.__stdout is not None:
618  pin=self.__stdout
619 
620  if self.__capture and endpipe and pout is None:
621  pout=PIPE
622  elif not endpipe:
623  pout=PIPE
624 
625  (p,i,o,e)=launch(command,env,pin,pout,perr,self.__debug,cd)
626 
627  self.__children.add(p)
628  if not self.__quads: self.__stdin=i
629  self.__stdout=o
630  self.__stderr=e
631  self.__quads.append( (p,i,o,e) )
632  self.__last_pid=p
633  def send_signal(self,sig):
634  """!Sends a signal to all children.
635  @param sig the signal"""
636  for p in self.__children:
637  try:
638  os.kill(p,sig)
639  except EnvironmentError as e: pass
640 
641  def terminate(self):
642  """!Sends SIGTERM to all children."""
643  self.send_signal(signal.SIGTERM)
644  def kill(self):
645  """!Sends SIGKILL to all children."""
646  self.send_signal(signal.SIGKILL)
647 
648  # def __repr__(self):
649  # """Does not run the pipeline; simply returns <Pipeline at XX>
650  # where XX is the id of this object."""
651  # return '<Pipeline at 0x%x>'%(id(self),)
652 
653  def communicate(self,sleeptime=None):
654  """!Writes to input, reads from output, waits for child
655  processes, etc. This is just a wrapper around the manage()
656  function. It will return immediately if self.communicate has
657  already completed earlier.
658  @param sleeptime the sleep time in seconds between checks"""
659  with self.__lock:
660  if self.__managed: return
661  (o,e,m)=manage(
662  [q[0] for q in self.__quads],
663  self.__stdin, self.__stdout, self.__stderr,
664  self.__instring, self.__logger, self.__children,
665  sleeptime)
666  self.__managed=m
667  self.__outstring=o
668  self.__errstring=e
669 
670  def poll(self):
671  """!Returns the exit status of the last element of the
672  pipeline. If the process died due to a signal, returns a
673  negative number."""
674  m=self.__managed
675  if not m: return None
676  result=m[self.__last_pid][1]
677  if os.WIFEXITED(result):
678  return os.WEXITSTATUS(result)
679  elif os.WIFSIGNALED(result):
680  return -os.WTERMSIG(result)
681  else:
682  return -128
683 
684  def to_string(self):
685  """!Calls self.communicate(), and returns the stdout from the
686  pipeline (self.outstring). The return value will be Null if
687  the pipeline was redirected to a file or if the constructor's
688  capture option was not True."""
689  self.communicate()
690  return self.outstring
691 
692  @property
693  def outstring(self):
694  """!The stdout from the pipeline. Will be Null if the pipeline
695  was redirected to a file, or if the constructor's capture
696  option was not True."""
697  return self.__outstring
A class used to implement named constants.
Definition: pipeline.py:23
def kill(self)
Sends SIGKILL to all children.
Definition: pipeline.py:644
def __repr__(self)
Returns the r argument of the constructor.
Definition: pipeline.py:36
def terminate(self)
Sends SIGTERM to all children.
Definition: pipeline.py:641
def manage
Watches a list of processes, handles their I/O, returns when all processes have exited and all I/O is...
Definition: pipeline.py:330
This class is a wrapper around launch and manage.
Definition: pipeline.py:564
def outstring(self)
The stdout from the pipeline.
Definition: pipeline.py:693
def pclose(i)
Closes a file descriptor, removing it from the list that must be closed on exec.
Definition: pipeline.py:116
def __init__
Pipeline constructor.
Definition: pipeline.py:568
def unblock
Attempts to modify the given stream to be non-blocking.
Definition: pipeline.py:55
def padd(p)
Adds a file descriptor to the list to close before exec.
Definition: pipeline.py:110
def kill_for_thread(th)
Sends a TERM signal to all processes that the specified thread (a threading.Thread) is waiting for...
Definition: pipeline.py:307
def communicate
Writes to input, reads from output, waits for child processes, etc.
Definition: pipeline.py:653
def launch
Starts the specified command (a list), with the specified environment (or None to copy this process's...
Definition: pipeline.py:142
def __repr__(self)
Return a debug string representation of this Pipeline.
Definition: pipeline.py:589
def call_fcntrl
Internal function that implements unblock()
Definition: pipeline.py:70
def __init__
Creates a named constant.
Definition: pipeline.py:25
def __str__(self)
Returns the s argument to the constructor.
Definition: pipeline.py:33
def pipe
Creates a pipe that will be closed on exec.
Definition: pipeline.py:99
def filenoify(f)
Tries to convert f to a fileno.
Definition: pipeline.py:290
def poll(self)
Returns the exit status of the last element of the pipeline.
Definition: pipeline.py:670
def kill_all()
Sends a TERM signal to all processes that this module is managing.
Definition: pipeline.py:322
Raised when the produtil.sigsafety package catches a fatal signal.
Definition: pipeline.py:15
def to_string(self)
Calls self.communicate(), and returns the stdout from the pipeline (self.outstring).
Definition: pipeline.py:684
def send_signal(self, sig)
Sends a signal to all children.
Definition: pipeline.py:633
def pclose_all
Closes all file descriptors sent to padd.
Definition: pipeline.py:127