1 """!Internal module that launches and monitors processes.
3 Do not use this module directly: it is part of the internal
4 implementation of the produtil.prog and produtil.run modules. It
5 converts a produtil.prog.Runner object to processes, and monitors the
6 processes until they exit, sending and receiving data as needed. This
7 replaces the built-in "subprocess" module which is not capable of
8 general-purpose pipeline execution."""
12 __all__ = [
"launch",
"manage",
"PIPE",
"ERR2OUT",
"kill_all",
16 """!Raised when the produtil.sigsafety package catches a fatal
17 signal. Indicates to callers that the thread should exit."""
19 import os, signal, select, logging, sys, StringIO, time, errno, \
20 fcntl, threading, weakref, collections
21 import stat,errno,fcntl
24 """!A class used to implement named constants."""
26 """!Creates a named constant
27 @param s the return value of __str__() = str(self)
28 @param r the return value of __repr__() = repr(self)"""
31 r=
'Constant(%s)@0x%x'%(repr(s),id(self))
34 """!Returns the s argument to the constructor."""
37 """!Returns the r argument of the constructor."""
42 plock=threading.Lock()
56 """!Attempts to modify the given stream to be non-blocking. This
57 only works with streams that have an underlying POSIX fileno, such
60 Will re-raise any exception received, other than AttributeError
61 and EnvironmentError. Hence, I/O errors and attempts to make a
62 non-fileno stream non-blocking will produce a False return value,
63 while anything else will raise an exception.
65 @param stream the stream to unblock
66 @param logger a logging.Logger for log messages
67 @returns True on success, False otherwise."""
71 """!Internal function that implements unblock()
72 @param stream the stream to modify
73 @param on flags to turn on
74 @param off flags to turn off
75 @param logger a logging.Logger for messages
76 @returns True on success, False otherwise."""
78 if isinstance(stream,int):
82 except (AttributeError,EnvironmentError)
as ee:
83 if logger
is not None:
84 logger.warning(
'%s: stream has no fileno, cannot switch to '
85 'non-blocking I/O: %s'%
86 (repr(stream),str(ee)),exc_info=
True)
90 flags=fcntl.fcntl(fd, fcntl.F_GETFL)
91 fcntl.fcntl(fd, fcntl.F_SETFL, (flags|on) & ~off)
93 except EnvironmentError
as ee:
94 if logger
is not None:
95 logger.error(
'%s: cannot switch to non-blocking I/O: %s'%
96 (repr(stream),str(ee)),exc_info=
True)
100 """!Creates a pipe that will be closed on exec. Except that it
101 does not seem to be reliably closed on exec, so there are other
102 workarounds in this module.
103 @param logger a logging.Logger for log messages"""
111 """!Adds a file descriptor to the list to close before exec.
112 @param p the file descriptor"""
114 pipes_to_close.add(p)
117 """!Closes a file descriptor, removing it from the list that must
119 @param i the file descriptor"""
123 except EnvironmentError
as e:
pass
124 if i
in pipes_to_close:
125 pipes_to_close.remove(i)
128 """!Closes all file descriptors sent to padd.
129 @param i my stdin, which should not be closed
130 @param o my stdout, which should not be closed
131 @param e my stderr, which should not be closed
132 @param logger a logging.Logger for debug messages"""
134 for p
in pipes_to_close:
135 if p!=i
and p!=o
and p!=e:
136 if logger
is not None:
137 logger.debug(
"In child, close old pipe fd %d"%p)
139 pipes_to_close.clear()
141 def launch(cmd, env=None, stdin=None, stdout=None, stderr=None,
142 debug=
False, cd=
None):
143 """!Starts the specified command (a list), with the specified
144 environment (or None to copy this process's environment).
146 @param stdin,stdout,stderr Specifies the stdin, stdout and stderr
147 streams. The special value PIPE means "make a pipe," and sending
148 stderr=ERR2OUT requests redirection of stderr to stdout.
149 @param cd The optional "cd" argument specifies a directory to cd
150 into, in the child process, before executing the command. Of
151 course, you shouldn't care about any of this because you should be
152 using the produtil.run package.
153 @param cmd the command to run
154 @param env the subprocess's environment, or None to use mine
155 @param debug if True, send debug messages"""
157 if cd
is not None and not isinstance(cd,basestring):
159 "In produtil.pipeline.launch, cd must be a string or None")
162 "In produtil.pipeline.launch, cd must not be the empty string.")
164 stdinP=
None ; stdinC=
None
165 stdoutP=
None ; stdoutC=
None
166 stderrP=
None ; stderrC=
None
167 logger=logging.getLogger(cmd[0])
168 global pipes_to_close
170 logger.debug(
"Start %s"%(repr(cmd),))
173 (stdinC,stdinP)=
pipe(logger)
175 logger.debug(
"Pipe for stdin: %d<==%d"%(stdinC,stdinP))
179 (stdoutP,stdoutC)=
pipe(logger)
181 logger.debug(
"Pipe for stdout: %d<==%d"%(stdoutP,stdoutC))
185 (stderrP,stderrC)=
pipe(logger)
187 logger.debug(
"Pipe for stderr: %d<==%d"%(stderrP,stderrC))
188 elif stderr
is not ERR2OUT:
195 if stdin
is not None and stdin
is not PIPE:
197 logger.debug(
"Close stdin %d on parent."%stdin)
199 if stdin
is PIPE
and stdinC
is not None:
201 logger.debug(
"Close stdinC %d on parent."%stdinC)
204 if stdout
is not None and stdout
is not PIPE:
206 logger.debug(
"Close stdout %d on parent."%stdout)
208 if stdout
is PIPE
and stdoutC
is not None:
210 logger.debug(
"Close stdoutC %d on parent."%stdoutC)
213 if stderr
is not None and stderr
is not PIPE
and stderr
is not ERR2OUT:
215 logger.debug(
"Close stderr %d on parent."%stderr)
217 if stderr
is PIPE
and stderrC
is not None:
219 logger.debug(
"Close stderrC %d on parent."%stderrC)
223 logger.debug(
"On parent, returning %s"%(
224 repr((pid, stdinP,stdoutP,stderrP))))
225 return (pid, stdinP,stdoutP,stderrP)
227 if isinstance(cd,basestring):
233 if stdinP
is not None:
235 logger.debug(
"Close stdinP %d on child."%stdinP)
237 if stdinC
is not None:
239 logger.debug(
"Point stdin to stdinC %d on child and close original."%stdinC)
243 if stdoutP
is not None:
245 logger.debug(
"Close stdoutP %d on child."%stdoutP)
247 if stdoutC
is not None:
249 logger.debug(
"Point stdout to stdoutC %d on child and close original."%stdoutC)
253 if stderr
is ERR2OUT:
255 logger.debug(
"Redirect stderr to stdout on child.")
257 if stderrP
is not None:
259 logger.debug(
"Close stderrP %d on child."%stderrP)
261 if stderrC
is not None:
263 logger.debug(
"Point stderr to stderrC %d on child and close original."%stderrC)
268 logger.debug(
"Reset signal handlers on child.")
270 signal.signal(signal.SIGHUP,signal.SIG_DFL)
271 signal.signal(signal.SIGTERM,signal.SIG_DFL)
272 signal.signal(signal.SIGINT,signal.SIG_DFL)
273 signal.signal(signal.SIGQUIT,signal.SIG_DFL)
274 signal.signal(signal.SIGPIPE,signal.SIG_DFL)
275 signal.signal(signal.SIGCHLD,signal.SIG_DFL)
281 logger.debug(
"Run %s %s on child"%(cmd[0],
" ".join(cmd[1:])))
283 os.execvp(cmd[0],cmd)
285 os.execvpe(cmd[0],cmd,env)
286 except Exception
as e:
287 logger.error(
"%s: could not exec: %s"%(cmd[0],str(e)),exc_info=
True)
291 """!Tries to convert f to a fileno
292 @returns an integer UNIX file descriptor
293 @param f ERR2OUT, PIPE, an integer fileno or a file-like object
294 with a fileno() function."""
295 if f
is ERR2OUT
or f
is PIPE
or f
is None:
return f
296 if isinstance(f,int):
return f
301 _manage_set=collections.defaultdict(set)
308 """!Sends a TERM signal to all processes that the specified thread
309 (a threading.Thread) is waiting for."""
311 killme=set(_manage_set[tht])
314 os.kill(p,signal.SIGTERM)
315 except EnvironmentError
as e:
318 _manage_set[th].remove(killme)
319 except (ValueError,KeyError,TypeError)
as e:
323 """!Sends a TERM signal to all processes that this module is
329 def manage(proclist,inf=None,outf=None,errf=None,instr=None,logger=None,
330 childset=
None,sleeptime=
None):
331 """!Watches a list of processes, handles their I/O, returns when
332 all processes have exited and all I/O is complete.
334 @warning You should not be calling this function unless you are
335 modifying the implementation of Pipeline. Use the produtil.run
336 module instead of calling launch() and manage().
338 @param proclist the list of processes to watch
339 @param inf the input file
340 @param outf the output file
341 @param errf the error file
342 @param instr the input string, instead of an input file
343 @param childset the set of child process ids
344 @param sleeptime sleep time between checks of child processes
345 @param logger Logs to the specified object, at level DEBUG, if a logger is
347 @returns a tuple containing the stdout string (or None), the
348 stderr string (or None) and a dict mapping from process id to the
349 return value from os.wait4 called on that process."""
351 me=weakref.ref(threading.current_thread())
371 if logger
is not None:
372 logger.debug(
"Will write instr (%d bytes) to %d."
379 if logger
is not None:
380 logger.debug(
"Will read outstr from %d."%outf)
381 work.append([1,outf])
382 outio=StringIO.StringIO()
387 if logger
is not None:
388 logger.debug(
"Will read errstr from %d."%errf)
389 work.append([1,errf])
390 errio=StringIO.StringIO()
394 for proc
in proclist:
395 if logger
is not None:
396 logger.debug(
"Monitor process %d."%proc)
397 work.append([2,proc])
405 if _kill_all
is not None:
406 if logger
is not None:
407 logger.debug(
"Kill all processes.")
408 for (job,tgt)
in work:
413 assert(job==0
or job==1
or job==2)
415 if logger
is not None:
416 logger.debug(
"Attempt a write of %d bytes to %d"
417 %(len(instr)-nin,tgt))
419 n=os.write(tgt,instr[nin:])
420 except EnvironmentError
as e:
421 if e.errno==errno.EAGAIN
or e.errno==errno.EWOULDBLOCK:
426 if logger
is not None:
427 logger.debug(
"Wrote %d bytes to %d."%(n,tgt))
430 if logger
is not None:
431 logger.debug(
"Done writing all %d bytes; close %d."
437 if logger
is not None:
438 logger.debug(
"Force close of in %d due to timeout."
445 if logger
is not None:
446 logger.debug(
"Attempt a read from %d"%tgt)
447 s=os.read(tgt,bufsize)
448 except EnvironmentError
as e:
449 if e.errno==errno.EAGAIN
or e.errno==errno.EWOULDBLOCK:
450 if logger
is not None:
451 logger.debug(
"Error %s from %d - assume no data"
458 if logger
is not None:
459 logger.debug(
"eof reading output %d"%tgt)
464 if logger
is not None:
465 logger.debug(
"Read %d bytes from output %d"
470 if logger
is not None:
471 logger.debug(
"Force close of %d due to timeout."
477 if logger
is not None:
478 logger.debug(
"Check process %d"%tgt)
480 r=os.wait4(tgt,os.WNOHANG)
481 if r
and ( r[0]!=0
or r[1]!=0 ):
482 if logger
is not None:
483 logger.debug(
"Process %d exited"%tgt)
487 except (ValueError,KeyError,TypeError)
as e:
488 if logger
is not None:
490 "Cannot remove pid %d from _manage_set: %s"
491 %(tgt,str(e)),exc_info=
True)
492 if childset
is not None:
495 except (ValueError,KeyError,TypeError)
as e:
496 if logger
is not None:
498 "Cannot remove pid %d from childset: %s"
499 %(tgt,str(e)),exc_info=
True)
503 if logger
is not None:
504 logger.debug(
"Process %d still running"%tgt)
510 if now-lastproc > 2
and work:
511 if logger
is not None:
513 "No data two seconds after processes exited. "
514 "Forcing a close of all streams.")
520 if logger
is not None:
521 logger.debug(
"Bottom of loop with work=%s"%repr(work))
523 time.sleep(sleeptime)
528 if logger
is not None:
529 logger.debug(
"Done monitoring pipeline.")
533 outstr=outio.getvalue()
538 errstr=errio.getvalue()
541 if _kill_all
is not None:
543 "Master thread caught a signal. This thread should exit.")
544 return (outstr, errstr, done)
548 def simple_run(cmd, env=None, stdin=None, stdout=None, stderr=None,
549 debug=
False, cd=
None, logger=
None):
550 (pid, stdinP, stdoutP, stderrP) =
launch(
551 cmd,env,stdin,stdout,stderr,debug,cd)
552 (outstder, errstr, done) = \
553 manage([pid], inf=stdinP, outf=stdoutP, errf=stderrP, logger=logger)
555 if os.WIFEXITED(result):
556 return os.WEXITSTATUS(result)
557 elif os.WIFSIGNALED(result):
558 return -os.WTERMSIG(result)
565 """!This class is a wrapper around launch and manage. It converts
566 Runner objects to calls to "launch", and runs "manage" on the
567 resulting processes."""
568 def __init__(self,runner,capture=False,logger=None,debug=False):
569 """!Pipeline constructor
570 @param runner the produtil.prog.Runner to convert
571 @param capture if True, capture the stdout of the runner
572 @param logger a logging.Logger for messages
573 @param debug if True, send debug messages"""
587 self.
__lock=threading.Lock()
588 runner._gen(self,logger=logger)
590 """!Return a debug string representation of this Pipeline."""
591 return "<Pipeline id=0x%x in=%s out=%s err=%s>"%(
594 def _impl_add(self,command,endpipe,logger=None,
595 instring=
None,stdin=
None,stdout=
None,stderr=
None,
596 sendout=
None,senderr=
None,sendin=
None,env=
None,
597 closein=
None,closeout=
None,closeerr=
None,
599 """!Adds another produtil.prog.Runner's contents to this Pipeline.
600 @param command the command to run
601 @param endpipe is this the end of a pipeline?
602 @param logger a logging.Logger for log messages
603 @param instring the string for stdin
604 @param stdin a file, fileno or PIPE constant for stdin
605 @param stdout a file, fileno or special constant for stdout
606 @param stderr a file, fileno or special constant for stderr
607 @param env subprocess environment variables
608 @param closein,closeout,closeerr Unused.
609 @param cd subprocess working directory"""
610 pin = stdin
if (sendin
is None)
else sendin
611 pout = stdout
if (sendout
is None)
else sendout
612 perr = stderr
if (senderr
is None)
else senderr
614 if instring
is not None:
617 if pin
is None and self.
__stdout is not None:
620 if self.
__capture and endpipe
and pout
is None:
627 self.__children.add(p)
631 self.__quads.append( (p,i,o,e) )
634 """!Sends a signal to all children.
635 @param sig the signal"""
639 except EnvironmentError
as e:
pass
642 """!Sends SIGTERM to all children."""
645 """!Sends SIGKILL to all children."""
654 """!Writes to input, reads from output, waits for child
655 processes, etc. This is just a wrapper around the manage()
656 function. It will return immediately if self.communicate has
657 already completed earlier.
658 @param sleeptime the sleep time in seconds between checks"""
671 """!Returns the exit status of the last element of the
672 pipeline. If the process died due to a signal, returns a
675 if not m:
return None
677 if os.WIFEXITED(result):
678 return os.WEXITSTATUS(result)
679 elif os.WIFSIGNALED(result):
680 return -os.WTERMSIG(result)
685 """!Calls self.communicate(), and returns the stdout from the
686 pipeline (self.outstring). The return value will be Null if
687 the pipeline was redirected to a file or if the constructor's
688 capture option was not True."""
694 """!The stdout from the pipeline. Will be Null if the pipeline
695 was redirected to a file, or if the constructor's capture
696 option was not True."""
A class used to implement named constants.
def kill(self)
Sends SIGKILL to all children.
def __repr__(self)
Returns the r argument of the constructor.
def terminate(self)
Sends SIGTERM to all children.
def manage
Watches a list of processes, handles their I/O, returns when all processes have exited and all I/O is...
This class is a wrapper around launch and manage.
def outstring(self)
The stdout from the pipeline.
def pclose(i)
Closes a file descriptor, removing it from the list that must be closed on exec.
def __init__
Pipeline constructor.
def unblock
Attempts to modify the given stream to be non-blocking.
def padd(p)
Adds a file descriptor to the list to close before exec.
def kill_for_thread(th)
Sends a TERM signal to all processes that the specified thread (a threading.Thread) is waiting for...
def communicate
Writes to input, reads from output, waits for child processes, etc.
def launch
Starts the specified command (a list), with the specified environment (or None to copy this process's...
def __repr__(self)
Return a debug string representation of this Pipeline.
def call_fcntrl
Internal function that implements unblock()
def __init__
Creates a named constant.
def __str__(self)
Returns the s argument to the constructor.
def pipe
Creates a pipe that will be closed on exec.
def filenoify(f)
Tries to convert f to a fileno.
def poll(self)
Returns the exit status of the last element of the pipeline.
def kill_all()
Sends a TERM signal to all processes that this module is managing.
Raised when the produtil.sigsafety package catches a fatal signal.
def to_string(self)
Calls self.communicate(), and returns the stdout from the pipeline (self.outstring).
def send_signal(self, sig)
Sends a signal to all children.
def pclose_all
Closes all file descriptors sent to padd.