1 """!Contains the WorkPool class, which maintains pools of threads
2 that perform small tasks."""
6 __all__=[
"WorkPool",
"WrongThread"]
7 import threading, collections, time
11 """!Raised when a thread unrelated to a WorkPool attempts to
12 interact with the WorkPool. Only the thread that called the
13 constructor, and the threads created by the WorkPool can interact
17 """!Stores a piece of work. This is an internal implementation
18 class. Do not use it directly. It stores one piece of work to be
19 done by a worker thread in a WorkPool."""
22 """!Create a WorkTask whose job is to call work()
23 @param work the function to call
24 @param args the arguments to work"""
28 self.
__args=list() if(args
is None)
else list(args)
35 """!The arguments to the work function"""
38 def _set_exception(self,e):
39 """!Sets the exception that was raised by the work function.
40 Sets the done status to False.
41 @returns the exception
42 @param e the exception."""
46 def _del_exception(self):
47 """!Removes the exception that was raised by the work function."""
49 def _get_exception(self):
50 """!Returns the exception that was raised by the work function."""
51 if self.
__done:
return None
55 exception=property(_get_exception,_set_exception,_del_exception,
56 """The exception raised by the last work attempt""")
58 def _set_done(self,d):
59 """!Sets the "done" versus "not done" state to d
60 @returns a boolean version of d"""
66 """!Is this task complete?"""
69 """!Same as self._set_done(False)"""
73 done=property(_get_done,_set_done,_del_done,
74 """Is this work done? True or False.""")
77 """!Does nothing. Used to implement worker termination."""
82 TERMINATE=WorkTask(do_nothing)
85 """!A pool of threads that perform some list of tasks. There is a
86 function add_work() that adds a task to be performed.
88 Example: print the numbers from 1 to 10 in no particular order,
93 with WorkPool(3) as w:
94 print "three threads are waiting for work"
96 w.add_work(printit,[x+1])
97 print "all threads have work, but the work may not be complete"
99 print "all work is now complete."
100 print "once you get here, all workpool threads exited"
104 """!Create a WorkPool with the specified number of worker
105 threads. The nthreads must be at least 1."""
111 self.
_master=threading.current_thread()
118 except (Exception,KeyboardInterrupt)
as e:
125 """!Does nothing. Called from atop a "with" block."""
128 """!Called at the bottom of a "with" block. If no exception
129 was raised, and no "break" encountered, then waits for work to
130 complete, and then kills threads. Upon a fatal signal or
131 break, kills threads as quickly as possible.
132 @param etype,value,traceback exception information"""
136 elif isinstance(value,KeyboardInterrupt) \
138 self.
_critical(
'Terminal signal caught. Will try to kill '
139 'threads before exiting.')
141 elif isinstance(value,GeneratorExit) \
142 or isinstance(value,Exception):
145 def _info(self,message):
146 """!Log to INFO level
147 @param message the message to log"""
148 me=threading.current_thread()
149 if self.
logger is None:
return
151 self.logger.info(
'[master] '+message)
153 self.logger.info(
'[%s] %s'%(me.name,message))
155 def _debug(self,message):
156 """!Log to DEBUG level
157 @param message the message to log"""
158 me=threading.current_thread()
159 if self.
logger is None:
return
161 self.logger.debug(
'[master] '+message)
163 self.logger.debug(
'[%s] %s'%(me.name,message))
165 def _error(self,message,exc_info=False):
166 """!Log to ERROR level
167 @param message the message to log"""
168 me=threading.current_thread()
169 if self.
logger is None:
return
171 self.logger.error(
'[master] '+message,exc_info=exc_info)
173 self.logger.error(
'[%s] %s'%(me.name,message),exc_info=exc_info)
175 def _critical(self,message,exc_info=False):
176 """!Log to CRITICAL level
177 @param message the message to log"""
178 me=threading.current_thread()
179 if self.
logger is None:
return
181 self.logger.critical(
'[master] '+message,exc_info=exc_info)
183 self.logger.critical(
'[%s] %s'%(me.name,message),exc_info=exc_info)
188 """!The number of worker threads."""
192 """!Adds a piece of work to be done. It must be a callable
193 object. If there are no worker threads, the work() is called
194 immediately. The args are passed, if present.
195 @param work a callable object
196 @param args a list of arguments to the work function"""
197 me=threading.current_thread()
200 "In WorkPool.add_work, thread %s is not the master "
201 "thread and is not a work thread."%(str(me),))
210 self._work_queue.append(worktask)
211 self.
_debug(
"Added work %s"%(repr(work),))
212 self._work_semaphore.release()
214 def _worker_exit_check(self):
215 """!Return True if worker threads should keep running, False if
220 def _valid_thread(self):
221 """!Returns True if this is the thread that called the
222 constructor or any worker thread. Returns False otherwise."""
223 me=threading.current_thread()
224 if me==self.
_master:
return True
226 if t==me:
return True
229 def _worker_main(self):
230 """!Main function for worker threads. Do not call directly."""
231 me=threading.current_thread()
234 "In WorkPool._worker_main, thread %s is not the master "
235 "thread and is not a work thread."%(str(me),))
241 assert(isinstance(wq,collections.deque))
244 self.
_debug(
'Ready for work.')
247 if work
is TERMINATE:
250 self.
_debug(
' ... working ... ')
257 except Exception
as e:
262 self.
_error(
'...failed.',exc_info=
True)
267 """!Starts n new threads. Can only be called from the thread
268 that made this object.
269 @param n number of threads to start, an integer greater than 0"""
271 assert(isinstance(n,int))
273 me=threading.current_thread()
276 "In WorkPool.kill_threads, thread %s is not the master "
277 "thread."%(str(me),))
286 thread=threading.Thread(target=doit,args=[self])
288 self._threads.add(thread)
291 except (Exception,KeyboardInterrupt)
as e:
293 self.
_error(
'ERROR: '+str(e),exc_info=
True)
294 self._threads.remove(thread)
301 """!Kills all worker threads. Can only be called from the
302 thread that made this object."""
303 me=threading.current_thread()
306 "In WorkPool.kill_threads, thread %s is not the master "
307 "thread."%(str(me),))
313 for thread
in killme:
314 if not isinstance(wq,collections.deque):
316 "self._work_queue should be a deque but it is a"
317 " %s %s"%(type(wq).__name__,repr(wq)))
318 wq.appendleft(TERMINATE)
321 for thread
in killme:
322 self.
_debug(
"Kill worker thread %s"%(repr(thread),))
327 self.
_debug(
"Done killing worker threads.")
330 """!Waits for all threads to reach the barrier function. This
331 can only be called by the master thread.
333 Upon calling, the master thread adds a WorkTask for each
334 thread, telling the thread to call self.barrier(). Once all
335 threads have reached that point, the barrier returns in all
341 "In WorkPool.add_work, thread %s is not the master "
342 "thread and is not a work thread."%(str(me),))
344 me=threading.current_thread()
346 self.
_debug(
'BARRIER (master)')
349 self.
_debug(
'Request barrier on all threads.')
350 for i
in xrange(len(self.
_threads)):
352 self.
_debug(
'Wait for all workers to reach barrier.')
357 self._barrier_condition.notify_all()
358 self._barrier_set.clear()
360 self.
_debug(
'BARRIER (worker)')
364 self._barrier_set.add(me)
365 self._barrier_condition.wait()
368 "In WorkPool.barrier, thread %s is not the master thread "
369 "and is not a worker thread."%(str(me),))
def do_nothing()
Does nothing.
Sets up signal handlers to ensure a clean exit.
def _worker_exit_check(self)
Return True if worker threads should keep running, False if they should exit.
def _debug(self, message)
Log to DEBUG level.
def checksig()
This should be called frequently from worker threads to determine if the main thread has received a s...
def __init__
Create a WorkPool with the specified number of worker threads.
def kill_for_thread(th)
Sends a TERM signal to all processes that the specified thread (a threading.Thread) is waiting for...
die
If True, all threads should exit immediately.
def _critical
Log to CRITICAL level.
def barrier(self)
Waits for all threads to reach the barrier function.
def __exit__(self, etype, value, traceback)
Called at the bottom of a "with" block.
def add_work
Adds a piece of work to be done.
def start_threads(self, n)
Starts n new threads.
def _error
Log to ERROR level.
Base class of the exceptions thrown when a signal is caught.
def nthreads(self)
The number of worker threads.
A pool of threads that perform some list of tasks.
def args(self)
The arguments to the work function.
def kill_threads(self)
Kills all worker threads.
Raised when a thread unrelated to a WorkPool attempts to interact with the WorkPool.
logger
a logging.Logger for log messages
def _valid_thread(self)
Returns True if this is the thread that called the constructor or any worker thread.
def __enter__(self)
Does nothing.
Internal module that launches and monitors processes.
work
The function this WorkTask should call.
def __init__
Create a WorkTask whose job is to call work()