HWRF  trunk@4391
workpool.py
1 """!Contains the WorkPool class, which maintains pools of threads
2 that perform small tasks."""
3 
4 ##@var __all__
5 # List of symbols exported by "from produtil.workpool import *"
6 __all__=["WorkPool","WrongThread"]
7 import threading, collections, time
9 
10 class WrongThread(Exception):
11  """!Raised when a thread unrelated to a WorkPool attempts to
12  interact with the WorkPool. Only the thread that called the
13  constructor, and the threads created by the WorkPool can interact
14  with it."""
15 
16 class WorkTask(object):
17  """!Stores a piece of work. This is an internal implementation
18  class. Do not use it directly. It stores one piece of work to be
19  done by a worker thread in a WorkPool."""
20 
21  def __init__(self,work,args=None):
22  """!Create a WorkTask whose job is to call work()
23  @param work the function to call
24  @param args the arguments to work"""
25  self.work=work
26  self.__done=False
27  self.__exception=None
28  self.__args=list() if(args is None) else list(args)
29 
30  ##@var work
31  # The function this WorkTask should call
32 
33  @property
34  def args(self):
35  """!The arguments to the work function"""
36  return self.__args
37 
38  def _set_exception(self,e):
39  """!Sets the exception that was raised by the work function.
40  Sets the done status to False.
41  @returns the exception
42  @param e the exception."""
43  self.__done=False
44  self.__exception=e
45  return self.__exception
46  def _del_exception(self):
47  """!Removes the exception that was raised by the work function."""
48  self.__exception=None
49  def _get_exception(self):
50  """!Returns the exception that was raised by the work function."""
51  if self.__done: return None
52  return self.__exception
53 
54  ## The exception that was raised by the work function.
55  exception=property(_get_exception,_set_exception,_del_exception,
56  """The exception raised by the last work attempt""")
57 
58  def _set_done(self,d):
59  """!Sets the "done" versus "not done" state to d
60  @returns a boolean version of d"""
61  self.__done=bool(d)
62  if self.__done:
63  self.__exception=None
64  return self.__done
65  def _get_done(self):
66  """!Is this task complete?"""
67  return self.__done
68  def _del_done(self):
69  """!Same as self._set_done(False)"""
70  self.__done=False
71 
72  ## Is this work task done?
73  done=property(_get_done,_set_done,_del_done,
74  """Is this work done? True or False.""")
75 
76 def do_nothing():
77  """!Does nothing. Used to implement worker termination."""
78 
79 ##@var TERMINATE
80 # Special constant WorkTask used to terminate a WorkPool.
81 # Do not modify.
82 TERMINATE=WorkTask(do_nothing)
83 
84 class WorkPool(object):
85  """!A pool of threads that perform some list of tasks. There is a
86  function add_work() that adds a task to be performed.
87 
88  Example: print the numbers from 1 to 10 in no particular order,
89  in three threads:
90  @code
91  def printit(num):
92  print str(num)
93  with WorkPool(3) as w:
94  print "three threads are waiting for work"
95  for x in xrange(10):
96  w.add_work(printit,[x+1])
97  print "all threads have work, but the work may not be complete"
98  w.barrier()
99  print "all work is now complete."
100  print "once you get here, all workpool threads exited"
101  @endcode"""
102 
103  def __init__(self,nthreads,logger=None):
104  """!Create a WorkPool with the specified number of worker
105  threads. The nthreads must be at least 1."""
106  self._work_queue=collections.deque()
107  self._work_semaphore=threading.Semaphore(0)
108  self._barrier_set=set()
109  self._barrier_condition=threading.Condition()
110  self._threads=set()
111  self._master=threading.current_thread()
112  self._modlock=threading.Lock()
113  self._die=True # threads should exit
114  self._last_id=0
115  self.logger=logger
116  try:
117  self.start_threads(nthreads)
118  except (Exception,KeyboardInterrupt) as e:
119  self.kill_threads()
120  raise
121  ##@var logger
122  # a logging.Logger for log messages
123 
124  def __enter__(self):
125  """!Does nothing. Called from atop a "with" block."""
126  return self
127  def __exit__(self,etype,value,traceback):
128  """!Called at the bottom of a "with" block. If no exception
129  was raised, and no "break" encountered, then waits for work to
130  complete, and then kills threads. Upon a fatal signal or
131  break, kills threads as quickly as possible.
132  @param etype,value,traceback exception information"""
133  if value is None:
134  self.barrier()
135  self.kill_threads()
136  elif isinstance(value,KeyboardInterrupt) \
137  or isinstance(value,produtil.sigsafety.CaughtSignal):
138  self._critical('Terminal signal caught. Will try to kill '
139  'threads before exiting.')
140  self.kill_threads()
141  elif isinstance(value,GeneratorExit) \
142  or isinstance(value,Exception):
143  self.kill_threads()
144 
145  def _info(self,message):
146  """!Log to INFO level
147  @param message the message to log"""
148  me=threading.current_thread()
149  if self.logger is None: return
150  if me==self._master:
151  self.logger.info('[master] '+message)
152  else:
153  self.logger.info('[%s] %s'%(me.name,message))
154 
155  def _debug(self,message):
156  """!Log to DEBUG level
157  @param message the message to log"""
158  me=threading.current_thread()
159  if self.logger is None: return
160  if me==self._master:
161  self.logger.debug('[master] '+message)
162  else:
163  self.logger.debug('[%s] %s'%(me.name,message))
164 
165  def _error(self,message,exc_info=False):
166  """!Log to ERROR level
167  @param message the message to log"""
168  me=threading.current_thread()
169  if self.logger is None: return
170  if me==self._master:
171  self.logger.error('[master] '+message,exc_info=exc_info)
172  else:
173  self.logger.error('[%s] %s'%(me.name,message),exc_info=exc_info)
174 
175  def _critical(self,message,exc_info=False):
176  """!Log to CRITICAL level
177  @param message the message to log"""
178  me=threading.current_thread()
179  if self.logger is None: return
180  if me==self._master:
181  self.logger.critical('[master] '+message,exc_info=exc_info)
182  else:
183  self.logger.critical('[%s] %s'%(me.name,message),exc_info=exc_info)
184 
185 
186  @property
187  def nthreads(self):
188  """!The number of worker threads."""
189  return len(self._threads)
190 
191  def add_work(self,work,args=None):
192  """!Adds a piece of work to be done. It must be a callable
193  object. If there are no worker threads, the work() is called
194  immediately. The args are passed, if present.
195  @param work a callable object
196  @param args a list of arguments to the work function"""
197  me=threading.current_thread()
198  if me!=self._master:
199  raise WrongThread(
200  "In WorkPool.add_work, thread %s is not the master "
201  "thread and is not a work thread."%(str(me),))
202 
203  if self.nthreads<1:
204  if args is None:
205  work()
206  else:
207  work(*args)
208  else:
209  worktask=WorkTask(work,args)
210  self._work_queue.append(worktask)
211  self._debug("Added work %s"%(repr(work),))
212  self._work_semaphore.release()
213 
214  def _worker_exit_check(self):
215  """!Return True if worker threads should keep running, False if
216  they should exit."""
218  return not self.die
219 
220  def _valid_thread(self):
221  """!Returns True if this is the thread that called the
222  constructor or any worker thread. Returns False otherwise."""
223  me=threading.current_thread()
224  if me==self._master: return True
225  for t in self._threads:
226  if t==me: return True
227  return False
228 
229  def _worker_main(self):
230  """!Main function for worker threads. Do not call directly."""
231  me=threading.current_thread()
232  if not self._valid_thread():
233  raise WrongThread(
234  "In WorkPool._worker_main, thread %s is not the master "
235  "thread and is not a work thread."%(str(me),))
236 
237  while self._worker_exit_check():
238  ws=self._work_semaphore
239  #assert(isinstance(ws,threading.Semaphore))
240  wq=self._work_queue
241  assert(isinstance(wq,collections.deque))
242  work=None
243  try:
244  self._debug('Ready for work.')
245  ws.acquire()
246  work=wq.popleft()
247  if work is TERMINATE:
248  self._debug('terminate')
249  return
250  self._debug(' ... working ... ')
251  if work.args:
252  args=work.args
253  work.work(*args)
254  else:
255  work.work()
256  work.done=True
257  except Exception as e:
258  if work is not None:
259  work.exception=e
260  #wq.append(work)
261  #ws.release()
262  self._error('...failed.',exc_info=True)
263  else:
264  raise
265 
266  def start_threads(self,n):
267  """!Starts n new threads. Can only be called from the thread
268  that made this object.
269  @param n number of threads to start, an integer greater than 0"""
270  assert(n>=0)
271  assert(isinstance(n,int))
272  if n==0: return
273  me=threading.current_thread()
274  if me!=self._master:
275  raise WrongThread(
276  "In WorkPool.kill_threads, thread %s is not the master "
277  "thread."%(str(me),))
278  self.die=False
279  for i in xrange(n):
280  with self._modlock:
281  tid=self._last_id+1
282  thread=None
283  try:
284  def doit(a):
285  a._worker_main()
286  thread=threading.Thread(target=doit,args=[self])
287  #thread.daemon=True
288  self._threads.add(thread)
289  self._last_id=tid
290  thread.start()
291  except (Exception,KeyboardInterrupt) as e:
292  if thread in self._threads:
293  self._error('ERROR: '+str(e),exc_info=True)
294  self._threads.remove(thread)
295  raise
296 
297  ##@var die
298  # If True, all threads should exit immediately.
299 
300  def kill_threads(self):
301  """!Kills all worker threads. Can only be called from the
302  thread that made this object."""
303  me=threading.current_thread()
304  if me!=self._master:
305  raise WrongThread(
306  "In WorkPool.kill_threads, thread %s is not the master "
307  "thread."%(str(me),))
308  self.die=False
309  wq=self._work_queue
310  ws=self._work_semaphore
311  with self._modlock:
312  killme=set(self._threads)
313  for thread in killme:
314  if not isinstance(wq,collections.deque):
315  raise TypeError(
316  "self._work_queue should be a deque but it is a"
317  " %s %s"%(type(wq).__name__,repr(wq)))
318  wq.appendleft(TERMINATE)
319  ws.release()
320 
321  for thread in killme:
322  self._debug("Kill worker thread %s"%(repr(thread),))
324  thread.join()
325 
326  #self._threads.clear()
327  self._debug("Done killing worker threads.")
328 
329  def barrier(self):
330  """!Waits for all threads to reach the barrier function. This
331  can only be called by the master thread.
332 
333  Upon calling, the master thread adds a WorkTask for each
334  thread, telling the thread to call self.barrier(). Once all
335  threads have reached that point, the barrier returns in all
336  threads."""
337  if self.nthreads<=0: return
338 
339  if not self._valid_thread():
340  raise WrongThread(
341  "In WorkPool.add_work, thread %s is not the master "
342  "thread and is not a work thread."%(str(me),))
343 
344  me=threading.current_thread()
345  if me==self._master:
346  self._debug('BARRIER (master)')
347  with self._modlock:
348  # First, tell all worker threads to call this function:
349  self._debug('Request barrier on all threads.')
350  for i in xrange(len(self._threads)):
351  self.add_work(self.barrier)
352  self._debug('Wait for all workers to reach barrier.')
353  # Now wait for it to happen:
354  while len(self._barrier_set) < len(self._threads):
355  time.sleep(0.01)
356  with self._barrier_condition:
357  self._barrier_condition.notify_all()
358  self._barrier_set.clear()
359  else:
360  self._debug('BARRIER (worker)')
361  for thread in self._threads:
362  if me==thread:
363  with self._barrier_condition:
364  self._barrier_set.add(me)
365  self._barrier_condition.wait()
366  return
367  raise WrongThread(
368  "In WorkPool.barrier, thread %s is not the master thread "
369  "and is not a worker thread."%(str(me),))
def do_nothing()
Does nothing.
Definition: workpool.py:76
Sets up signal handlers to ensure a clean exit.
Definition: sigsafety.py:1
def _worker_exit_check(self)
Return True if worker threads should keep running, False if they should exit.
Definition: workpool.py:214
def _debug(self, message)
Log to DEBUG level.
Definition: workpool.py:155
def checksig()
This should be called frequently from worker threads to determine if the main thread has received a s...
Definition: sigsafety.py:97
def __init__
Create a WorkPool with the specified number of worker threads.
Definition: workpool.py:103
def kill_for_thread(th)
Sends a TERM signal to all processes that the specified thread (a threading.Thread) is waiting for...
Definition: pipeline.py:307
die
If True, all threads should exit immediately.
Definition: workpool.py:278
def _critical
Log to CRITICAL level.
Definition: workpool.py:175
def barrier(self)
Waits for all threads to reach the barrier function.
Definition: workpool.py:329
def __exit__(self, etype, value, traceback)
Called at the bottom of a "with" block.
Definition: workpool.py:127
def add_work
Adds a piece of work to be done.
Definition: workpool.py:191
def start_threads(self, n)
Starts n new threads.
Definition: workpool.py:266
def _error
Log to ERROR level.
Definition: workpool.py:165
Base class of the exceptions thrown when a signal is caught.
Definition: sigsafety.py:58
def nthreads(self)
The number of worker threads.
Definition: workpool.py:187
A pool of threads that perform some list of tasks.
Definition: workpool.py:84
def args(self)
The arguments to the work function.
Definition: workpool.py:34
def kill_threads(self)
Kills all worker threads.
Definition: workpool.py:300
Stores a piece of work.
Definition: workpool.py:16
Raised when a thread unrelated to a WorkPool attempts to interact with the WorkPool.
Definition: workpool.py:10
logger
a logging.Logger for log messages
Definition: workpool.py:115
def _valid_thread(self)
Returns True if this is the thread that called the constructor or any worker thread.
Definition: workpool.py:220
def __enter__(self)
Does nothing.
Definition: workpool.py:124
Internal module that launches and monitors processes.
Definition: pipeline.py:1
work
The function this WorkTask should call.
Definition: workpool.py:25
def __init__
Create a WorkTask whose job is to call work()
Definition: workpool.py:21