HWRF  trunk@4391
Classes | Functions | Variables
produtil.pipeline Namespace Reference

Internal module that launches and monitors processes. More...

Detailed Description

Internal module that launches and monitors processes.

Do not use this module directly: it is part of the internal implementation of the produtil.prog and produtil.run modules. It converts a produtil.prog.Runner object to processes, and monitors the processes until they exit, sending and receiving data as needed. This replaces the built-in "subprocess" module which is not capable of general-purpose pipeline execution.

Classes

class  Constant
 A class used to implement named constants. More...
 
class  NoMoreProcesses
 Raised when the produtil.sigsafety package catches a fatal signal. More...
 
class  Pipeline
 This class is a wrapper around launch and manage. More...
 

Functions

def unblock
 Attempts to modify the given stream to be non-blocking. More...
 
def call_fcntrl
 Internal function that implements unblock() More...
 
def pipe
 Creates a pipe that will be closed on exec. More...
 
def padd (p)
 Adds a file descriptor to the list to close before exec. More...
 
def pclose (i)
 Closes a file descriptor, removing it from the list that must be closed on exec. More...
 
def pclose_all
 Closes all file descriptors sent to padd. More...
 
def launch
 Starts the specified command (a list), with the specified environment (or None to copy this process's environment). More...
 
def filenoify (f)
 Tries to convert f to a fileno. More...
 
def kill_for_thread (th)
 Sends a TERM signal to all processes that the specified thread (a threading.Thread) is waiting for. More...
 
def kill_all ()
 Sends a TERM signal to all processes that this module is managing.
 
def manage
 Watches a list of processes, handles their I/O, returns when all processes have exited and all I/O is complete. More...
 
def simple_run
 

Variables

list __all__
 List of symbols exported by "from produtil.pipeline import *". More...
 
tuple plock = threading.Lock()
 A global lock for this module. More...
 
tuple pipes_to_close = set()
 Set of pipes that must be closed after forking to avoid deadlocks. More...
 
tuple PIPE = Constant('PIPE')
 Indicates that stdout, stdin or stderr should be a pipe. More...
 
tuple ERR2OUT = Constant('ERR2OUT')
 Request that stderr and stdout be the same stream. More...
 
tuple _manage_set = collections.defaultdict(set)
 Auto-killing processes.
 
 _kill_all = None
 Should all processes be killed? Used by the produtil.sigsafety module. More...
 

Function Documentation

def produtil.pipeline.call_fcntrl (   stream,
  on,
  off,
  logger = None 
)

Internal function that implements unblock()

Parameters
streamthe stream to modify
onflags to turn on
offflags to turn off
loggera logging.Logger for messages
Returns
True on success, False otherwise.

Definition at line 70 of file pipeline.py.

Referenced by produtil.pipeline.pipe(), and produtil.pipeline.unblock().

def produtil.pipeline.filenoify (   f)

Tries to convert f to a fileno.

Returns
an integer UNIX file descriptor
Parameters
fERR2OUT, PIPE, an integer fileno or a file-like object with a fileno() function.

Definition at line 290 of file pipeline.py.

Referenced by produtil.pipeline.manage().

def produtil.pipeline.kill_for_thread (   th)

Sends a TERM signal to all processes that the specified thread (a threading.Thread) is waiting for.

Definition at line 307 of file pipeline.py.

Referenced by produtil.workpool.WorkPool.kill_threads().

def produtil.pipeline.launch (   cmd,
  env = None,
  stdin = None,
  stdout = None,
  stderr = None,
  debug = False,
  cd = None 
)

Starts the specified command (a list), with the specified environment (or None to copy this process's environment).

Parameters
stdin,stdout,stderrSpecifies the stdin, stdout and stderr streams. The special value PIPE means "make a pipe," and sending stderr=ERR2OUT requests redirection of stderr to stdout.
cdThe optional "cd" argument specifies a directory to cd into, in the child process, before executing the command. Of course, you shouldn't care about any of this because you should be using the produtil.run package.
cmdthe command to run
envthe subprocess's environment, or None to use mine
debugif True, send debug messages

Definition at line 142 of file pipeline.py.

Referenced by produtil.pipeline.Pipeline.__repr__(), produtil.pipeline.manage(), and produtil.pipeline.pclose_all().

def produtil.pipeline.manage (   proclist,
  inf = None,
  outf = None,
  errf = None,
  instr = None,
  logger = None,
  childset = None,
  sleeptime = None 
)

Watches a list of processes, handles their I/O, returns when all processes have exited and all I/O is complete.

Warning
You should not be calling this function unless you are modifying the implementation of Pipeline. Use the produtil.run module instead of calling launch() and manage().
Parameters
proclistthe list of processes to watch
infthe input file
outfthe output file
errfthe error file
instrthe input string, instead of an input file
childsetthe set of child process ids
sleeptimesleep time between checks of child processes
loggerLogs to the specified object, at level DEBUG, if a logger is specified.
Returns
a tuple containing the stdout string (or None), the stderr string (or None) and a dict mapping from process id to the return value from os.wait4 called on that process.

Definition at line 330 of file pipeline.py.

Referenced by produtil.pipeline.Pipeline.communicate(), and produtil.pipeline.kill_all().

def produtil.pipeline.padd (   p)

Adds a file descriptor to the list to close before exec.

Parameters
pthe file descriptor

Definition at line 110 of file pipeline.py.

Referenced by produtil.pipeline.launch().

def produtil.pipeline.pclose (   i)

Closes a file descriptor, removing it from the list that must be closed on exec.

Parameters
ithe file descriptor

Definition at line 116 of file pipeline.py.

Referenced by produtil.pipeline.launch(), and produtil.pipeline.manage().

def produtil.pipeline.pclose_all (   i = None,
  o = None,
  e = None,
  logger = None 
)

Closes all file descriptors sent to padd.

Parameters
imy stdin, which should not be closed
omy stdout, which should not be closed
emy stderr, which should not be closed
loggera logging.Logger for debug messages

Definition at line 127 of file pipeline.py.

Referenced by produtil.pipeline.launch().

def produtil.pipeline.pipe (   logger = None)

Creates a pipe that will be closed on exec.

Except that it does not seem to be reliably closed on exec, so there are other workarounds in this module.

Parameters
loggera logging.Logger for log messages

Definition at line 99 of file pipeline.py.

Referenced by produtil.pipeline.launch().

def produtil.pipeline.unblock (   stream,
  logger = None 
)

Attempts to modify the given stream to be non-blocking.

This only works with streams that have an underlying POSIX fileno, such as those from open.

Will re-raise any exception received, other than AttributeError and EnvironmentError. Hence, I/O errors and attempts to make a non-fileno stream non-blocking will produce a False return value, while anything else will raise an exception.

Parameters
streamthe stream to unblock
loggera logging.Logger for log messages
Returns
True on success, False otherwise.

Definition at line 55 of file pipeline.py.

Referenced by produtil.pipeline.manage().

Variable Documentation

produtil.pipeline.__all__
Initial value:
1 = [ "launch", "manage", "PIPE", "ERR2OUT", "kill_all",
2  "kill_for_thread" ]

List of symbols exported by "from produtil.pipeline import *".

Definition at line 12 of file pipeline.py.

produtil.pipeline._kill_all = None

Should all processes be killed? Used by the produtil.sigsafety module.

Definition at line 305 of file pipeline.py.

produtil.pipeline.ERR2OUT = Constant('ERR2OUT')

Request that stderr and stdout be the same stream.

Definition at line 54 of file pipeline.py.

produtil.pipeline.PIPE = Constant('PIPE')

Indicates that stdout, stdin or stderr should be a pipe.

Definition at line 50 of file pipeline.py.

produtil.pipeline.pipes_to_close = set()

Set of pipes that must be closed after forking to avoid deadlocks.

Definition at line 46 of file pipeline.py.

produtil.pipeline.plock = threading.Lock()

A global lock for this module.

Definition at line 42 of file pipeline.py.