HWRF  trunk@4391
datastore.py
1 """!Stores products and tasks in an sqlite3 database file.
2 
3 This module maintains an sqlite3 database file that stores information
4 about Products and Tasks. A Product is a file or group of files
5 created by some Task. Both Product and Task classes derive from
6 Datum, which is the base class of anything that can be stored in the
7 Datastore."""
8 
9 import sqlite3, threading, collections, re, contextlib, time, random,\
10  traceback, datetime, logging, os, time
12 
13 ##@var __all__
14 # Symbols exported by "from produtil.datastore import *"
15 __all__=['DatumException','DatumLockHeld','InvalidID','InvalidOperation',
16  'UnknownLocation','FAILED','UNSTARTED','RUNNING','PARTIAL',
17  'COMPLETED','Datastore','Transaction','Datum','CallbackExceptions',
18  'Product','Task']
19 
20 class FakeException(Exception):
21  """!This is a fake exception used to get a stack trace. It will
22  never be raised outside this module."""
23 
24 class DatumException(Exception):
25  """!Superclass of all exceptions local to produtil.datastore."""
26 
27 class DatumLockHeld(Exception):
28  """!Raised when a LockDatum is held by another Worker."""
29  def __init__(self,did,owner,owninfo,ownseen,ownlegacy,checktime):
30  """!DatumLockHeld constructor.
31 
32  @param did the database ID of the datum whose lock is held
33  @param owner the owner of the lock
34  @param owninfo implementation-defined information about the owner
35  @param ownseen last time the owner checked in
36  @param ownlegacy length of time the lock is valid
37  @param checktime suggestion of how often to check the lock"""
38  self.did=str(did)
39  self.owner=int(owner)
40  self.owninfo=str(owninfo)
41  self.ownseen=int(ownseen)
42  self.ownlegacy=int(ownlegacy)
43  self.checktime=int(checktime)
44  ##@var did
45  # The database ID of the datum whose lock is held
46 
47  ##@var owner
48  # The owner of the lock
49 
50  ##@var owninfo
51  # implementation-defined information about the owner
52 
53  ##@var ownseen
54  # last time the owner checked in
55 
56  ##@var ownlegacy
57  # length of time the lock is valid
58 
59  ##@var checktime
60  # suggestion of how often to check the lock
61  def __str__(self):
62  """!Human-readable representation of this exception."""
63  return '''Cannot lock Datum %s at %d (%s)
64 Was locked by worker %d (info %s)
65 Last seen %d = %s
66 Lock legacy time %d
67 Time left: %d''' % (self.did,self.checktime,
68  datetime.datetime.utcfromtimestamp(self.checktime).ctime(),
69  self.owner,self.owninfo,self.ownseen,
70  datetime.datetime.utcfromtimestamp(self.ownseen).ctime(),
71  self.ownlegacy,self.ownseen+self.ownlegacy-self.checktime)
72  def __repr__(self):
73  """!String representation of this object."""
74  return '%s(%s,%s,%s,%s,%s,%s)' % \
75  ( type(self).__name__, repr(self.did), repr(self.owner), repr(self.owninfo),
76  repr(self.ownseen), repr(self.ownlegacy), repr(self.checktime) )
77 
79  """!Raised when a Datum or subclass receives a prodname or category name that is invalid."""
80 class InvalidOperation(DatumException):
81  """!Raised when an invalid Datum operation is requested, such as delivering an UpstreamProduct."""
83  """!Raised when delivering data, but no location is provided."""
84 
85 ##@var _has_dcolon
86 # Regular expression to detect a database ID with a double colon in it.
87 _has_dcolon=re.compile('\A.*::.*\Z')
88 
89 ##@var _has_dstar
90 # Regular expression to detect a database ID with a double asterisk in it.
91 _has_dstar=re.compile('\A.*\*\*.*\Z')
92 
93 ##@var TASK_CATEGORY
94 # Special product category used for Tasks.
95 TASK_CATEGORY='**task**'
96 
97 # Constants used by the Task class:
98 
99 ##@var FAILED
100 # Constant used for Task.state to indicate a run was attempted but failed.
101 FAILED=-10
102 """Constant for use in Task.state: indicates a run was attempted but
103 failed."""
104 
105 ##@var UNSTARTED
106 # Constant used for Task.state to indicate no attempt was made to run.
107 UNSTARTED=0
108 """Constant for use in Task.state: indicates a run was never
109 attempted, or a run was attempted but the task was cleaned up."""
110 
111 ##@var RUNNING
112 # Constant used for Task.state to indicate the task is presently running.
113 RUNNING=10
114 """Constant for use in Task.state: indicates the Task is presently
115 running."""
116 
117 ##@var PARTIAL
118 # Constant used for Task.state to indicate the task was attempted but
119 # exited prematurely. Practically speaking, there is no way to tell
120 # the difference between RUNNING and PARTIAL since the job cannot
121 # ensure that it resets the state before unexpectedly exiting.
122 PARTIAL=20
123 """Constant for use in Task.state: indicates the Task was running but
124 exited prematurely. Practically speaking, there is no way to tell the
125 difference between RUNNING and PARTIAL since the job cannot ensure
126 that it resets the state before unexpectedly exiting."""
127 
128 ##@var COMPLETED
129 # Constant used for Task.state to indicate the task completed successfully.
130 COMPLETED=30
131 """Constant for use in Task.state: indicates the task ran to
132 completion successfully."""
133 
134 class Datastore(object):
135  """!Stores information about Datum objects in a database.
136 
137  Stores and retrieves Datum objects from an sqlite3 database. Uses
138  file locking workarounds for bugs in RedHat Enterprise Linux's
139  sqlite3 library, which is compiled without proper locking. Has
140  support for caching, and multithreading. Each object in this
141  database has a type, a string location, an integer "available"
142  parameter, and an arbitrary list of (key,value) metadata pairs.
143  This object can safely be accessed by multiple threads in the
144  local process, and handles concurrency between processes via file
145  locking."""
146  def __init__(self,filename,logger=None,locking=True):
147  """!Datastore constructor
148 
149  Creates a Datastore for the specified sqlite3 file. Uses the
150  given logging.Logger object for logging messages. Set
151  locking=False to disable all file locking. That is generally
152  unwise, and should only be used when reading the database.
153  That functionality is supplied, and critical, for monitoring
154  another user's jobs. One cannot lock another user's file, so
155  the "no locking" option is the only way to analyze the other
156  user's simulation.
157  @param filename the filename passed to sqlite3.connect
158  @param logger a logging.Logger to use for logging messages
159  @param locking should file locking be used? It is unwise to
160  turn off file locking.
161  @warning Setting locking=False will disable file locking at
162  both the Datastore level, and within sqlite3 itself. This
163  can lead to database corruption if two processes try to
164  write at the same time. This functionality is provided
165  for the rare situation where you are unable to write to
166  a database, such as when reading other users' sqlite3
167  database files."""
168  self._logger=logger
169  self.filename=filename
170  self.db=None
171  self._locking=locking
172  self._connections=dict()
173  self._map_lock=threading.Lock()
174  self._db_lock=threading.Lock()
175  lockfile=filename+'.lock'
176  if logger is not None:
177  logger.debug('Lockfile is %s for database %s'%(lockfile,filename))
179  lockfile,logger=logger,max_tries=300,sleep_time=0.1,first_warn=50)
180  self._transtack=collections.defaultdict(list)
181  with self.transaction() as tx:
182  self._createdb(self._connection())
183  ##@var db
184  # The underlying sqlite3 database object
185 
186  ##@var filename
187  # The path to the sqlite3 database file
188 
189  def _connection(self):
190  """!Gets the current thread's database connection. Each thread
191  has its own connection."""
192  tid=threading.current_thread().ident
193  with self._map_lock:
194  if tid in self._connections:
195  return self._connections[tid]
196  else:
197  c=sqlite3.connect(self.filename)
198  self._connections[tid]=c
199  return c
200  @contextlib.contextmanager
201  def _mystack(self):
202  """!Gets the transaction stack for the current thread."""
203  tid=threading.current_thread().ident
204  with self._map_lock:
205  yield self._transtack[tid]
206  def _lock(self):
207  """!Acquires the database lock for the current thread."""
208  if not self._locking: return
209  self._db_lock.acquire()
210  try:
211  self._file_lock.acquire()
212  except:
213  self._db_lock.release()
214  raise
215  def _unlock(self):
216  """!Releases the database lock from the current thread. If the
217  current thread does not have the lock, the results are
218  undefined."""
219  if not self._locking: return
220  self._file_lock.release()
221  self._db_lock.release()
222  #if self._logger is not None:
223  # self._logger.info('db lock release: '+\
224  # (''.join(traceback.format_list(traceback.extract_stack(limit=10)))))
225  def transaction(self):
226  """!Starts a transaction on the database in the current thread."""
227  return Transaction(self)
228  def _createdb(self,con):
229  """!Creates the tables used by this Datastore.
230 
231  Runs "CREATE TABLE" commands in the sqlite3 database to create
232  all tables needed by this class. This code must be executed
233  inside a transaction() and should only be executed on initial
234  opening of the file, in the Datastore constructor. It is safe
235  to run this command twice on the same file --- the "IF NOT
236  EXISTS" SQLite modifier is used to ensure the table will not
237  be replaced."""
238  con.execute('''CREATE TABLE IF NOT EXISTS products ( id TEXT NOT NULL, available INTEGER DEFAULT 0, location TEXT DEFAULT "", type TEXT DEFAULT "Product", PRIMARY KEY(id))''')
239  con.execute('''CREATE TABLE IF NOT EXISTS metadata ( id TEXT NOT NULL, key TEXT NOT NULL, value TEXT, CONSTRAINT id_metakey PRIMARY KEY(id,key))''')
240  con.execute('''CREATE TABLE IF NOT EXISTS workers ( info TEXT NOT NULL, lastseen INTEGER NOT NULL)''')
241  def dump(self):
242  """!Print database contents to the terminal.
243 
244  This function is only meant for debugging. It dumps to the
245  terminal an arguably human-readable display of the complete
246  database state via the print command."""
247  with self.transaction() as t:
248  products=t.query('SELECT id,available,location,type FROM products')
249  meta=t.query('SELECT id,key,value FROM metadata')
250  print 'TABLE products:'
251  taskmap={UNSTARTED:'UNSTARTED',FAILED:'FAILED',RUNNING:'RUNNING',
252  PARTIAL:'PARTIAL',COMPLETED:'COMPLETED'}
253  for row in products:
254  (what,avail,loc,typ)=row
255  if typ=='Task' and avail in taskmap:
256  print "id=%s available=%s (%s) location=%s type=%s" % \
257  ( what,avail,taskmap[avail],loc,typ )
258  elif typ=='Product':
259  print "id=%s available=%s (%s) location=%s type=%s" % \
260  ( what,avail,repr(bool(avail)),loc,typ )
261  else:
262  print "id=%s available=%s location=%s type=%s" % \
263  (what,avail,loc,typ)
264  print 'TABLE metadata:'
265  for row in meta:
266  print '%s[%s]=%s' % row
267 
268 ########################################################################
269 
270 class Transaction(object):
271  """!Datastore transaction support.
272 
273  Implements sqlite3 transactions on a Datastore. A transaction is
274  a set of reads and updates that should either ALL be done, or NONE
275  at all. Transactions also speed up the script, sometimes by as
276  much of a factor of 300, by grouping I/O operations to the
277  database into one large chunk. However, one must be careful in
278  using them since it keeps the database locked for an extended
279  period of time.
280 
281  This class should not be used directly. Instead, one should do
282  this to a Datum (Task or Product) object:
283 
284  with datum_object.transaction() as t:
285  ... do things to the datum object ...
286  transaction is now complete, database is updated."""
287  def __init__(self,ds):
288  """!Transaction constructor.
289 
290  Creates the Transaction object but does NOT initiate the
291  transaction."""
292  self.ds=ds
293 
294  def __enter__(self):
295  """!Locks the database for the current thread, if it isn't
296  already locked."""
297  with self.ds._mystack() as s:
298  first=not s # True = first transaction from this thread
299  s.append(self)
300  if first:
301  self.ds._lock()
302  return self
303  def __exit__(self,etype,evalue,traceback):
304  """!Releases the database lock if this is the last Transaction
305  released for the current thread.
306  @param etype,evalue Exception type and value, if any.
307  @param traceback Exception traceback, if any."""
308  with self.ds._mystack() as s:
309  assert(s.pop() is self)
310  unlock=not s
311  if unlock:
312  self.ds._connection().commit()
313  self.ds._unlock()
314  def query(self,stmt,subvals=()):
315  """!Performs an SQL query returning the result of cursor.fetchall()
316  @param stmt the SQL query
317  @param subvals the substitution values"""
318  cursor=self.ds._connection().execute(stmt,subvals)
319  return cursor.fetchall()
320  def mutate(self,stmt,subvals=()):
321  """!Performs an SQL database modification, returning the result
322  of cursor.lastrowid
323  @param stmt the SQL query
324  @param subvals the substitution values"""
325  cursor=self.ds._connection().execute(stmt,subvals)
326  return cursor.lastrowid
327  def init_datum(self,d,meta=True):
328  """!Add a Datum to the database if it is not there already.
329 
330  Given a Datum, add the object to the database if it is not
331  there already and fill the object with data from the database.
332  @param d the Datum
333  @param meta If True, also initialize metadata."""
334  prodtype=type(d).__name__
335  av = d._meta['available'] if ('available' in d._meta) else 0
336  loc = d._meta['location'] if ('location' in d._meta) else ''
337  #print 'INIT_DATUM with location=%s'%(repr(loc),)
338  self.mutate('INSERT OR IGNORE INTO products VALUES (?,?,?,?)',(d.did,av,loc,prodtype))
339  if loc is not None and loc!='':
340  # Update the location if it is not set in the product
341  # table, but is set in the initial values.
342  #print 'UPDATE LOCATION...'
343  for (did,oloc) in \
344  self.query('SELECT id,location FROM products WHERE id = ?',(d.did,)):
345  #print 'LOCATION currently %s'%(oloc,)
346  if did==d.did and (oloc is None or oloc==''):
347  self.mutate('UPDATE products SET location=? WHERE id=?',
348  (loc,d.did))
349  break
350 
351  if meta and d._meta is not None and d._meta:
352  for k,v in d._meta.iteritems():
353  if k!='location' and k!='available':
354  self.mutate('INSERT OR IGNORE INTO metadata VALUES (?,?,?)',(d.did,k,v))
355  if meta:
356  self.refresh_meta(d,or_add=False)
357  def update_datum(self,d):
358  """!Update database availability and location records.
359 
360  Given a Datum whose location and availability is set, update
361  that information in the database, adding the Datum if
362  necessary.
363  @param d the Datum"""
364  loc=str(d._meta['location'])
365  av=int(d._meta['available'])
366  self.mutate('INSERT OR REPLACE INTO products VALUES (?,?,?,?)',
367  (d.did,av,loc,type(d).__name__))
368  def refresh_meta(self,d,or_add=True):
369  """!Replace Datum metadata with database values, add new metadata to database.
370 
371  Given a Datum d, discards and replaces d._meta with the
372  current metadata, location and availability. Will raise an
373  exception if the product does not exist in the database.
374  @param d The Datum.
375  @param or_add If True, then any metadata that does not exist in the
376  database is created from values in d."""
377  found=False
378  meta=dict()
379  for (did,av,loc) in \
380  self.query('SELECT id, available, location FROM products WHERE id = ?',(d.did,)):
381  found=True
382  meta['available']=av
383  meta['location']=loc
384  #print 'refresh_meta update location=%s'%(repr(loc),)
385  break
386  if not found:
387  if or_add:
388  self.init_datum(d,meta=False)
389  meta['available']=0
390  meta['location']=''
391  #print 'refresh_meta not found so location=""'
392  for (did,k,v) in self.query('SELECT id, key, value FROM metadata WHERE id = ?',(d.did,)):
393  meta[k]=v
394  d._meta=meta
395  def set_meta(self,d,k,v):
396  """!Sets metadata key k to value v for the given Datum.
397 
398  Modifies the database entries for key k and datum d to have
399  the value v. If k is location or available, then the product
400  table will be updated instead.
401  @param d The Datum
402  @param k The metadata key.
403  @param v The value, a string."""
404  if k=='location':
405  self.mutate('UPDATE OR IGNORE products SET location = ? WHERE id = ?',(v,d.did))
406  elif k=='available':
407  self.mutate('UPDATE OR IGNORE products SET available = ? WHERE id = ?',(int(v),d.did))
408  else:
409  self.mutate('INSERT OR REPLACE INTO metadata VALUES (?,?,?)',(d.did,k,v))
410  def del_meta(self,d,k):
411  """!Delete metadata from the database.
412 
413  Deletes the specified metadata key k, which must not be
414  "location" or "available".
415  @param d The Datum whose metadata is being deleted.
416  @param k The metadata key, which cannot be "available" or "location"."""
417  assert(k != 'available' and k != 'location')
418  self.mutate('DELETE FROM metadata WHERE id=? AND key=?',(d.did,k))
419 
420  ##@var ds
421  # The Datastore containing the database for which this is a transaction.
422  # @memberof produtil::datastore::Transaction
423 
424 ########################################################################
425 
426 class Datum(object):
427  """!Superclass of anything that can be stored in a Datastore.
428 
429  This is the superclass of anything that can be placed in a
430  datastore. It has a category, a product name (prodname for
431  short), a location, availability (an int) and arbitrary metadata
432  (key,value) pairs. It caches database metadata in self._meta,
433  which is directly accessed by the Datastore class. Cache data
434  will be discarded once its age is older than self._cacheage."""
435  def __init__(self,dstore,prodname,category,meta=None,cache=30,location=None,**kwargs):
436  """!Datum constructor.
437 
438  Creates a Datum in the given Datastore dstore, under the
439  specified category and product name prodname. The datastore
440  id used is "category::prodname". The value for "cache" is the
441  number of seconds to retain cached metadata before going back
442  to disk to reread it. That only applies to data "get"
443  operations: setting a data or metadata value will cause an
444  immediate write to the database. Also, __contains__ ("var" in
445  self) will force a fetch from the database if the requested
446  metadata variable is not in the cached copy of the database.
447 
448  Values for location and meta are only the DEFAULT values, and
449  will be ignored if other values are already set in the
450  database. The location is only used if the product is not
451  already in the database: its location will be set to the
452  provided values upon entry. Similarly, the metadata is only
453  set in this call if there isn't already metadata for the
454  product with the given metadata keys.
455  @param dstore The Datastore for this Datum.
456  @param prodname The product name portion of the Datum ID
457  @param category The category part of the Datum ID
458  @param meta A dict of metadata values.
459  @param cache Metadata cache lifetime in seconds.
460  @param location The initial value for location, if it is not set already in the database.
461  @param kwargs Ignored."""
462  #print 'INIT WITH location=%s prodname=%s category=%s'% \
463  # (repr(location),repr(prodname),repr(category))
464  (self._dstore,self._prodname,self._category) = (dstore,str(prodname),str(category))
465  self._id=self._category+'::'+self._prodname
466  self._cachetime=time.time()
467  self._cacheage=30
468  if not cache:
469  self._cacheage=-1
470  self.validate()
471  if (meta is None):
472  self._meta=dict()
473  else:
474  self._meta=dict(meta)
475  if location is not None:
476  self._meta['location']=str(location)
477  if 'available' in self._meta:
478  self._meta['available']=int(self._meta['available'])
479  self._lock=threading.RLock()
480  with self._dstore.transaction() as t:
481  t.init_datum(self)
482 
483  # Lock/unlock self:
484  def __enter__(self):
485  """!Acquires this object's thread lock. This is used to manage cached data."""
486  self._lock.acquire()
487  return self
488  def __exit__(self,etype,evalue,traceback):
489  """!Releases this object's thread lock. This is used to manage cached data.
490  @param etype,evalue,traceback Exception information."""
491  self._lock.release()
492 
493  def validate(self):
494  """!Validates this object's Datastore, prodname and category."""
495  if _has_dcolon.match(self._prodname):
496  raise InvalidID('%s: the prodname cannot contain a double colon (::)'%(self._id))
497  if _has_dcolon.match(self._category):
498  raise InvalidID('%s: the category cannot contain a double colon (::)'%(self._id))
499 
500  # Getter/setters to implement the properties:
501  def getid(self):
502  """!Returns the database ID of this datum."""
503  return self._id
504  def getdatastore(self):
505  """!Returns the datastore of this datum."""
506  return self._dstore
507  def transaction(self):
508  """!Creates, but does not lock, a Transaction for this datum's datastore."""
509  return self._dstore.transaction()
510  def getprodtype(self):
511  """!Returns the product type of this Datum.
512 
513  Returns the product type of this Datum. This is generally the
514  name of the Python class that created the entry in the
515  database."""
516  return type(self).__name__
517  def getprodname(self):
518  """!Returns the product name part of the database ID."""
519  return self._prodname
520  def getcategory(self):
521  """!Returns the product category part of the database ID."""
522  return self._category
523 
524  def getlocation(self):
525  """!Returns the "location" field of this Datum's database entry."""
526  return self['location']
527  def setlocation(self,v):
528  """!Sets the "location" field of this Datum's database entry.
529  @param v the new location"""
530  self['location']=v
531 
532  ##@property prodname
533  # Read-only property, an alias for getprodname(): the product name
534  # part of the database ID.
535  prodname=property(getprodname,None,None,"""the product name (read-only)""")
536 
537  ##@property category
538  # Read-only property, an alias for getcategory(), the category
539  # name part of the database ID.
540  category=property(getcategory,None,None,"""the category (read-only)""")
541 
542  ##@property prodtype
543  # Read-only property, an alias for getprodtype(), the product
544  # type. This is generally the name of the Python class that
545  # created the entry in the database.
546  prodtype=property(getprodtype,None,None,
547  """Returns the prodtype for this Datum: its class name (read-only)""")
548 
549  ##@property did
550  # Read-only property, an alias for getid(). The full database ID.
551  did=property(getid,None,None,
552  """Returns the database id for this Datum (read-only)""")
553 
554  ## @property dstore
555  # Read-only property, an alias for getdatastore(), the Datastore
556  # in which this Datum resides.
557  dstore=property(getdatastore,None,None,
558  """Gets the Datastore object that contains this Datum (read-only)""")
559 
560  ## @property location
561  # Read-write property, an alias for getlocation() and
562  # setlocation(). The location on disk of this Datum.
563  location=property(getlocation,setlocation,None,
564  """The location of this product (read/write)""")
565 
566  def __hash__(self):
567  """!Integer hash function."""
568  return hash(self._prodname)^hash(self._category)
569  def __str__(self):
570  """!Human-readable description of this Datum."""
571  return '%s with id %s'%(self.prodtype,self.did)
572  def __repr__(self):
573  """!Python code-like description of this Datum."""
574  return '%s(%s,%s,%s)' % \
575  (self.prodtype,repr(self.dstore),repr(self._prodname),repr(self._category))
576  def __cmp__(self,other):
577  """!Compares two Datums' prodnames and categories.
578  @param other the other datum to compare against"""
579  if not isinstance(other,Datum): return NotImplemented
580  c=cmp(self._prodname,other._prodname)
581  c=cmp(self._category,other._category) if (c==0) else c
582  return c
583  def set_loc_avail(self,loc,avail):
584  """!Sets the location and availability of this Datum in a
585  single transaction.
586  @param loc the new location, a string
587  @param avail the new availability, an int"""
588  with self:
589  self._meta['location']=str(loc)
590  self._meta['available']=int(avail)
591  with self.transaction() as t:
592  t.update_datum(self)
593  def _getcache(self,k=None,force=False):
594  """!Requests or forces a cache update.
595  This is the implementation of metadata/location/available
596  caching. It returns self._meta if the cache has not aged out
597  (and k, if provided, is in self._meta) or goes to the
598  Datastore to update the cache, and then returns the resulting
599  self._meta. This MUST be called from within a "with self".
600  @param k The key of interest.
601  @param force If True, forces a cache update even if the
602  cache is not expired."""
603  logger=self.dstore._logger
604  did=self.did
605  if not force:
606  age=time.time()-self._cachetime
607  if age<self._cacheage:
608  if k is None or k in self._meta:
609  return self._meta
610  with self.transaction() as t:
611  t.refresh_meta(self)
612  self._cachetime=time.time()
613  return self._meta
614  def update(self):
615  """!Discards all cached metadata and refreshes it from the
616  Datastore."""
617  with self:
618  self._getcache(force=True)
619  def __getitem__(self,k):
620  """!Returns the value of the specified metadata key or raises
621  KeyError. Forces a cache update if k is not in the cache."""
622  with self:
623  meta=self._getcache(k)
624  return meta[k]
625  def meta(self,k,default=None):
626  """!Return the value of a metadata key
627 
628  Returns the value of the specified metadata key or returns
629  default if it is unset. Does NOT force a cache update if k is
630  missing from the cache. To force a cache update, use
631  __getitem__ instead.
632  @param k The key of interest.
633  @param default The value to return if no value is seen.
634  @returns The metadata value or the default."""
635  with self:
636  return self._getcache().get(k,default)
637  def get(self,k,default=None):
638  """!Alias for self.meta()
639  Returns the value of the specified metadata key or returns
640  default if it is unset. Does NOT force a cache update if k is
641  missing from the cache. To force a cache update, use
642  __getitem__ instead.
643  @param k The key of interest.
644  @param default The value to return if no value is seen.
645  @returns The metadata value or the default."""
646  with self:
647  return self._getcache().get(k,default)
648 
649  def __setitem__(self,k,v):
650  """!Sets the value of the specified metadata key.
651  @param k the key
652  @param v the value"""
653  with self:
654  with self.transaction() as t:
655  t.set_meta(self,k,v)
656  self._meta[k]=v
657  def __delitem__(self,k):
658  """!Deletes the specified metadata key, which must not be
659  "available" or "location".
660  @param k the key to delete"""
661  assert(k != 'available' and k != 'location')
662  with self:
663  with self.transaction() as t:
664  t.del_meta(self,k)
665  if k in self._meta:
666  del self._meta[k]
667  def __contains__(self,k):
668  """!Determines if a metadata key is set.
669  @returns True if the specified metadata key is set, and False
670  otherwise. Immediately returns True for 'available' and
671  'location' without checking the metadata cache.
672  @param k the key of interest"""
673  if k=='available' or k=='location':
674  return True
675  with self:
676  return k in self._getcache()
677  def iteritems(self):
678  """!Iterates over all metadata (key,value) pairs for this
679  Datum, including "available" and "location"."""
680  with self:
681  meta=dict(self._getcache())
682  assert('available' in meta)
683  assert('location' in meta)
684  yield 'available',meta['available']
685  yield 'location',meta['location']
686  for k,v in meta.iteritems():
687  if k!='location' and k!='available':
688  yield k,v
689 
690 ########################################################################
691 
692 class CallbackExceptions(Exception):
693  """!Exception raised when a Product class encounters
694  exceptions while calling its callback functions in
695  Product.call_callbacks."""
696  def __init__(self,message,exlist):
697  """!CallbackExceptions constructor.
698  @param message The beginning of the exception message. Each
699  exception's message will be appended to this.
700  @param exlist The list of exceptions."""
701  self.messagebase=message
702  self.exlist=exlist
703  for ex in exlist:
704  message+="\n %s"%(str(ex),)
705  Exception.__init__(self,message,exlist)
706 
707  ##@var exlist
708  # The list of exceptions raised.
709 
710  ##@var messagebase
711  # The original message sent to the constructor before
712  # per-exception messages were appended.
713 
714 ########################################################################
715 
716 class Product(Datum):
717  """!A piece of data produced by a Task.
718 
719  A Product is a piece of data that can be produced by a Task. Once
720  the product is available, self.available or self.is_available()
721  will be True, and the self.location will be valid. The meaning of
722  self.location is up to the implementer to decide, but it should be
723  a full path to a location on disk for file products. As with all
724  Datum objects, a Product also has arbitrary metadata."""
725  def add_callback(self,callback,args=None,states=None):
726  """!Adds a delivery callback function.
727 
728  Adds a delivery callback that is called when the product is
729  delivered. This is intended to do such tasks as running an
730  NCO dbn_alert, or copying to a website, or emailing someone.
731  This function is only added in this local Python instance, not
732  in the database file. Also, it is the responsibility of the
733  subclasses to call self.call_callbacks() from self.deliver()
734  to ensure the callbacks are run.
735 
736  Example:
737  @code{.py}
738  def callback(name,*args,**kwargs):
739  print "My fancy product %s was delivered!"%(name,)
740  product.add_callback(callback,[product.prodname])
741  @endcode
742 
743  @param callback The callback function, which must be able to
744  take any keyword or indexed arguments.
745  @param args The indexed arguments to send.
746  @param states Presently unused."""
747  if args is None:
748  largs=list()
749  else:
750  largs=list(args)
751  calldata=[callback,largs]
752  if '_callbacks' not in self.__dict__: setattr(self,'_callbacks',list())
753  self._callbacks.append(calldata)
754  def has_callbacks(self):
755  """!Returns True if this Product has any callback functions
756  and False otherwise"""
757  if '_callbacks' not in self.__dict__: return False
758  return len(self._callbacks)>0
759  def call_callbacks(self,logger=None):
760  """!Calls all delivery callback functions.
761 
762  Calls all data delivery callbacks for this Product. Collects
763  any raised Exception subclasses until after all callbacks are
764  called. Will raise CallbackExceptions if any exceptions are
765  caught.
766 
767  Subclasses should call this from either check, or deliver, as
768  appropriate for the product type.
769  @param logger Optional: the logging.Logger for logging messages."""
770  if '_callbacks' not in self.__dict__: return
771  if not self._callbacks: return
772  if logger is None and len(self._callbacks)>0:
773  logger=logging.getLogger('produtil.datastore')
774  exlist=None
775  meta=self._getcache()
776  for (callback,args) in self._callbacks:
777  try:
778  callback(*args,**meta)
779  except Exception as e:
780  if exlist is None: exlist=list()
781  exlist.append(e)
782  if logger is not None:
783  logger.warning(str(e),exc_info=True)
784  if exlist is not None:
785  raise CallbackExceptions('%s: exceptions caught when delivering this product'%(self.did,),exlist)
786  def check(self,**kwargs):
787  """!Asks the product to check its own availability and update
788  the database.
789 
790  Checks to see if this product is available. This is generally
791  not a cheap operation, as it can take seconds or minutes and
792  may fail. One should call "available" instead if cached
793  information is sufficient.
794  @param kwargs Additional keyword arguments are unused. This is
795  for use by subclasses."""
796  self.update()
797  return self.available
798  def deliver(self,**kwargs):
799  """!Asks the Product to deliver itself.
800 
801  Delivers a product to its destination. This is not
802  implemented by the base class. Note that this is generally an
803  expensive operation which may take seconds or minutes, and may
804  fail. It may involve copying many files, network access, or
805  even pulling tapes from a silo. In the end, the location and
806  availability are expected to be updated in the database.
807  @param kwargs Unused, to be used by subclasses.
808  @post available=True and location is non-empty."""
809  raise InvalidOperation('The Product base class does not implement deliver')
810  def undeliver(self,**kwargs):
811  """!"Undelivers" a product.
812 
813  The meaning of this function is implementation-dependent: it
814  could mean deleting an output file, or any number of other
815  actions. Regardless, it should result in self.available=False
816  or an exception being thrown. Note that this is generally an
817  expensive operation that could take seconds or minutes, and
818  may fail. The default implementation simply sets available to
819  False.
820 
821  @post available=False"""
822  self.available=False
823  def setavailable(self,val):
824  """!Sets the availability to the specified value.
825 
826  Sets the "available" attribute of this Product in the database
827  after converting the given value to a bool and then int
828  (int(bool(val))).
829  @param val the new availability"""
830  self['available']=int(bool(val))
831  def is_available(self):
832  """!Is the product available?
833 
834  Returns the "available" attribute of this Product in the
835  database, converted to a boolean value via bool()"""
836  return bool(int(self['available']))
837 
838  ##@property available
839  # Read-write property: is the product available?
840  available=property(is_available,setavailable,None,
841  """The availability of this product as a bool (read/write)""")
842 
843  def validate(self):
844  """!Validates this object's Datastore, prodname and category.
845 
846  Validates the Datastore, prodname and category of this
847  Product. In addition to the requirements made by Datum, this
848  function requires that the category not contain any double
849  stars ("**")."""
850  if _has_dstar.match(self._category):
851  raise InvalidID('%s: Product categories cannot contain double stars (**)'%(self._id))
852  super(Product,self).validate()
853 
854 ########################################################################
855 
857  """!A subclass of Product that represents file delivery.
858 
859  This subclass of Product represents a file that is delivered by
860  this workflow. The deliver() subroutine actually copies the file,
861  and undeliver() deletes it. The produtil.fileop.remove_file() and
862  produtil.fileop.deliver_file() are used for this purpose."""
863  def undeliver(self,delete=True,logger=None):
864  """!Undoes the effect of deliver()
865 
866  Sets this Product's available attribute to False. If
867  delete=True, will also delete the specified file.
868  @param delete if True, the file is deleted
869  @param logger a logging.Logger for log messages"""
870  loc=self.location
871  if loc and delete:
872  produtil.fileop.remove_file(filename=loc,logger=logger,info=True)
873  self.available=False
874  def deliver(self,location=None,frominfo=None,keep=True,logger=None,
875  copier=None):
876  """!Delivers the file to a destination.
877 
878  Delivers the file to a destination location specified. The
879  origin is in the "frominfo" argument. Delivery is done by
880  produtil.fileop.deliver_file. The keep, copier and logger
881  arguments are passed on unmodified.
882  @param location The new location.
883  @param frominfo Where to get the file from.
884  @param keep If True, the original file is always kept. If False,
885  the original file may be moved to the destination instead of copied.
886  @param logger a logging.Logger for log messages
887 
888  @param copier Passed to the copier argument of
889  produtil.fileop.deliver_file()
890  @post The file is at the location specified, and the database
891  location and availability are updated accordingly."""
892  if not isinstance(frominfo,basestring):
893  raise TypeError('FileProduct.deliver requires a string filename as its frominfo argument. You provided an object of type %s.'%(type(frominfo).__name__))
894  if location is not None and not isinstance(location,basestring):
895  raise TypeError('FileProduct.deliver requires a location argument that is None or a string filename. You provided an object of type %s.'%(type(frominfo).__name__))
896  loc=location
897  setloc=True
898  if loc is None:
899  setloc=False
900  loc=self.location
901  if loc is None:
902  raise UnknownLocation(
903  '%s: no location known when delivering product. Specify a '
904  'location to deliver().'%(self.did))
905  produtil.fileop.deliver_file(frominfo,loc,keep=keep,logger=logger,
906  copier=copier)
907  if setloc:
908  self.set_loc_avail(loc,True)
909  else:
910  self.available=True
911  self.call_callbacks(logger=logger)
912 
913 ########################################################################
914 
916  """!Represents a Product created by an external workflow.
917 
918  This subclass of Product represents a file that is created by an
919  external workflow. It implements a check() call that determines
920  if the file is larger than a minimum size and older than a minimum
921  age. Once the file is large enough and old enough, it sets the
922  location and availability. Any calls to undeliver() or deliver()
923  raise InvalidOperation."""
924  def check(self,frominfo=None,minsize=None,minage=None,logger=None):
925  """!Checks the specified file to see if it is available.
926 
927  Looks for the file on disk. Updates the "available" and
928  "location" attributes of this Product. Uses two metadata
929  values to decide whether the file is "available" if it exists:
930 
931  self["minsize"] - minimum size in bytes. Default: 0
932  self["minage"] - minimum age in seconds. Default: 20
933 
934  Both can be overridden by corresponding arguments. Note that
935  one must be careful with the minimum age on poorly-maintained
936  clusters due to clock skews.
937  @param frominfo Optional: where to look for the file instead of
938  looking at self.location
939  @param minsize Optional: the minimum file size in bytes
940  @param minage Optional: the minimum file modification time in seconds.
941  @param logger Optional: a logging.Logger for log messages."""
942  loc=frominfo
943  setloc=True
944  if loc is None:
945  setloc=False
946  loc=self.location
947  elif not isinstance(loc,basestring):
948  raise TypeError('UpstreamFile.check requires a frominfo argument that is either None or a string filename. You provided an object of type %s.'%(type(frominfo).__name__))
949  assert(loc is not None)
950  if minsize is None:
951  minsize=int(self.get('minsize',0))
952  if minage is None:
953  minage=int(self.get('minage',20))
954  if not produtil.fileop.check_file(loc,min_size=minsize,
955  min_mtime_age=minage):
956  if self.available:
957  self.available=False
958  return False
959  if setloc:
960  self.set_loc_avail(loc,True)
961  else:
962  self.available=True
963  self.call_callbacks(logger=logger)
964  return True
965  def undeliver(self):
966  """!Undelivering an UpstreamFile merely sets the internal
967  "available" flag to False. It does not remove the data."""
968  self.available=False
969  def deliver(self,location=None,frominfo=None):
970  """!Raises InvalidOperation. You cannot deliver an upstream
971  file. The upstream workflow must do that for you. Call
972  check() instead to see if the file has been delivered."""
973  raise InvalidOperation('Internal error: the scripts attempted to deliver an upstream product.')
974 
975 ########################################################################
976 
977 def wait_for_products(plist,logger,renamer=None,action=None,
978  renamer_args=None,action_args=None,sleeptime=20,
979  maxtime=1800):
980  """!Waits for products to be available and performs an action on them.
981 
982  Waits for a specified list of products to be available, and
983  performs some action on each product when it becomes available.
984  Sleeps sleeptime seconds between checks. Returns the number of
985  products that were found before the maxtime was reached.
986 
987  @param plist A Product or a list of Product objects.
988  @param logger A logging.Logger object in which to log messages.
989  @param renamer Optional: a function or callable object that
990  provides a new name for each product. This is passed the
991  product, the logger and the contents of *renamer_args.
992  Default: os.path.basename(p.location)
993  @param action Optional: an action to perform on each product.
994  This is passed the product, the output of renamer, the logger
995  and the contents of *action_args. Default: perform no action.
996  @param renamer_args Optional: arguments to renamer.
997  @param action_args Optional: arguments to action.
998  @param sleeptime - after checking availability of all products, if
999  at least one is unavailable, the code will sleep for this much
1000  time before rechecking. Will be overridden by 0.01 if it is
1001  set to something lower than that. Default: 20
1002  @param maxtime - maximum amount of time to spend in this routine
1003  before giving up.
1004  @returns the number of products that became available before the
1005  maximum wait time was hit. """
1006  if renamer is None:
1007  renamer=lambda p,l: os.path.basename(p.location)
1008  if isinstance(plist,Product):
1009  plist=[plist]
1010  if not ( isinstance(plist,tuple) or isinstance(plist,list) ):
1011  raise TypeError('In wait_for_products, plist must be a '
1012  'list or tuple, not a '+type(plist).__name__)
1013  now=int(time.time())
1014  start=now
1015  seen=set()
1016  for p in plist:
1017  if not isinstance(p,Product):
1018  raise TypeError('In wait_for_products, plist must only '
1019  'contain Product objects.')
1020  if renamer_args is None: renamer_args=list()
1021  if action_args is None: action_args=list()
1022  logger.info('Waiting for %d products.'%(int(len(plist)),))
1023  while len(seen)<len(plist) and now<start+maxtime:
1024  now=int(time.time())
1025  for p in plist:
1026  if p in seen: continue
1027  if not p.available: p.check()
1028  if p.available:
1029  logger.info('Product %s is available at location %s'
1030  %(repr(p.did),repr(p.location)))
1031  seen.add(p)
1032  if action is not None:
1033  name=renamer(p,logger,*renamer_args)
1034  action(p,name,logger,*action_args)
1035  else:
1036  logger.info(
1037  'Product %s not available (available=%s location=%s).'
1038  %(repr(p.did),repr(p.available),repr(p.location)))
1039  now=int(time.time())
1040  if now<start+maxtime and len(seen)<len(plist):
1041  sleepnow=max(0.01,min(sleeptime,start+maxtime-now-1))
1042  logfun=logger.info if (sleepnow>=5) else logger.debug
1043  logfun('Sleeping %g seconds...'%(float(sleepnow),))
1044  time.sleep(sleepnow)
1045  logfun('Done sleeping.')
1046  logger.info('Done waiting for products: found %d of %d products.'
1047  %(int(len(seen)),int(len(plist))))
1048  return len(seen)
1049 
1050 ########################################################################
1051 
1052 class Task(Datum):
1053  """!Represents a process or actor that makes a Product.
1054 
1055  A Task represents some work that needs to be done to produce
1056  Products. A task has a state (stored in the "available" metadata
1057  attribute), a location, whose meaning is up to the implementer to
1058  decide, and a logger.Logger. As with all Datum subclasses, a Task
1059  also has arbitrary metadata."""
1060  def __init__(self,dstore,taskname,logger=None,**kwargs):
1061  """!Task constructor
1062 
1063  Creates a new Task from the given dataset and with the given
1064  task name.
1065  @param dstore the Datastore where this Task will live
1066  @param taskname the task name, passed to the Datum as prodname
1067  @param logger a logging.Logger for this task to use for log messages
1068  @param kwargs other keyword arguments are passed to Datum.__init__"""
1069  if logger is None:
1070  logger=logging.getLogger(taskname)
1071  self._logger=logger
1072  Datum.__init__(self,dstore=dstore,prodname=taskname,category=TASK_CATEGORY,**kwargs)
1073 
1074  @property
1075  def jlogfile(self):
1076  """!returns the jlogfile logger.
1077 
1078  Returns a logging.Logger for the jlogfile. The jlogfile is
1079  intended to receive only major errors, and per-job start and
1080  completion information. This is equivalent to simply
1081  accessing produtil.log.jlogger."""
1082  return produtil.log.jlogger
1083 
1084  def postmsg(self,message,*args,**kwargs):
1085  """!same as produtil.log.jlogger.info()
1086 
1087  Sends a message to the multi-job shared log file at level
1088  INFO.
1089  @param message the message
1090  @param args positional arguments for string replacement
1091  @param kwargs keyword arguments for string replacement."""
1092  produtil.log.jlogger.info(message,*args,**kwargs)
1093 
1094  def setstate(self,val):
1095  """!Sets the state of this job.
1096 
1097  Sets the job stat to the specified value. This works by
1098  setting the "available" attribute to the specified integer.
1099  For compatibility with other scripts, this should be FAILED,
1100  UNSTARTED, RUNNING, PARTIAL or COMPLETED.
1101  @param val the new job state, an int"""
1102  self['available']=int(val)
1103  def getstate(self):
1104  """!Returns the job state.
1105 
1106  Returns the "available" attribute as an integer. This is used
1107  as the state of the Task. Typically, the return value should
1108  be one of: FAILED, UNSTARTED, RUNNING, PARTIAL, or COMPLETED."""
1109  return int(self['available'])
1110 
1111  ##@property produtil.datastore.Product.state
1112  # Read-write property: the job state. Can be FAILED, UNSTARTED,
1113  # RUNNING, PARTIAL or COMPLETED.
1114  state=property(getstate,setstate,None,
1115  """The state of this Task as an int (read/write)""")
1116 
1117  @property
1118  def strstate(self):
1119  """!A string representation of the job state."""
1120  s=int(self['available'])
1121  if s==FAILED: return 'FAILED'
1122  elif s==UNSTARTED: return 'UNSTARTED'
1123  elif s==RUNNING: return 'RUNNING'
1124  elif s==PARTIAL: return 'PARTIAL'
1125  elif s==COMPLETED: return 'COMPLETED'
1126  else: return 'UNRECOGNIZED(%d)'%s
1127  def gettaskname(self):
1128  """!Returns the task name part of the database ID, which is the
1129  same as the prodname."""
1130  return self._prodname
1131 
1132  ##@property taskname
1133  # Read-only property: the name of this task. Same as self.prodname.
1134  taskname=property(gettaskname,None,None,
1135  """!The task name (read-only, same as prodname)""")
1136  def products(self,*args,**kwargs):
1137  """!Iterate over the products this task produces.
1138 
1139  Iterates over some or all of the products produced by this
1140  task. The arguments are used to select subsets of the total
1141  set of products. Provide no arguments to get the full list of
1142  products. All subclasses should re-implement this method, and
1143  interpret the arguments in a way that makes sense to that
1144  class. The default implementation returns immediately without
1145  doing anything.
1146  @param args,kwargs Implementation-defined, used by subclasses."""
1147  return
1148  for x in []: yield x # ensures this is an iterator
1149  def log(self):
1150  """!Returns the logger object for this task."""
1151  return self._logger
1152  def clean(self):
1153  """!Cleans up any unneeded data used by this task.
1154 
1155  Subclasses should override this function to clean up any
1156  unneeded temporary files or other unused resources consumed by
1157  the run() function. This default implementation does nothing."""
1158  pass
1159  def unrun(self):
1160  """!Undoes the effect of run().
1161 
1162  Cleans up this Task's work areas, "undelivers" all
1163  deliverables, and makes it look like the task has never been
1164  run. All subclasses should re-implement this method, and must
1165  also "unrun" everything their parent class runs. The default
1166  implementation simply calls self.clean() and sets the state to
1167  UNSTARTED.
1168  @post self.state=UNSTARTED"""
1169  self.clean()
1170  self.state=UNSTARTED
1171  def run(self):
1172  """!Performs the work this Task should do and generates all products.
1173 
1174  Performs the work that this task is supposed to do. All
1175  subclasses should re-implement this method, and should set the
1176  state to COMPLETED the end. This implementation simply calls
1177  self.setstate(COMPLETED)
1178  @post self.state=COMPLETED"""
1179  self.setstate(COMPLETED)
1180  def is_completed(self):
1181  """!Is this task complete?
1182 
1183  Returns True if this task's state is COMPLETED, and False
1184  otherwise."""
1185  return self.state==COMPLETED
1186 
1187  ##Read-only property: is this task completed? Same as is_completed()
1188  @property
1189  def completed(self):
1190  """!True if self.state==COMPLETED, False otherwise."""
1191  return self.state==COMPLETED
1192 
1193  def runpart(self):
1194  """!Run some of this task's work, deliver some products.
1195 
1196  Performs a subset of the work that this task is supposed to do
1197  and returns. This is intended to be used for tasks that can
1198  be broken up into small pieces, such as post-processing all
1199  output files from a NWP simulation one by one. The default
1200  implementation simply calls self.run()"""
1201  self.run()
1202 
def __delitem__(self, k)
Deletes the specified metadata key, which must not be "available" or "location".
Definition: datastore.py:657
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def dump(self)
Print database contents to the terminal.
Definition: datastore.py:241
def __init__(self, message, exlist)
CallbackExceptions constructor.
Definition: datastore.py:696
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
Definition: fileop.py:359
def __init__(self, dstore, prodname, category, meta=None, cache=30, location=None, kwargs)
Datum constructor.
Definition: datastore.py:435
def __exit__(self, etype, evalue, traceback)
Releases the database lock if this is the last Transaction released for the current thread...
Definition: datastore.py:303
checktime
suggestion of how often to check the lock
Definition: datastore.py:43
prodtype
Read-only property, an alias for getprodtype(), the product type.
Definition: datastore.py:546
def __repr__(self)
String representation of this object.
Definition: datastore.py:72
def transaction(self)
Creates, but does not lock, a Transaction for this datum's datastore.
Definition: datastore.py:507
def clean(self)
Cleans up any unneeded data used by this task.
Definition: datastore.py:1152
exlist
The list of exceptions raised.
Definition: datastore.py:702
def meta
Return the value of a metadata key.
Definition: datastore.py:625
def _createdb(self, con)
Creates the tables used by this Datastore.
Definition: datastore.py:228
def validate(self)
Validates this object's Datastore, prodname and category.
Definition: datastore.py:493
Superclass of all exceptions local to produtil.datastore.
Definition: datastore.py:24
Exception raised when a Product class encounters exceptions while calling its callback functions in P...
Definition: datastore.py:692
Handles file locking using Python "with" blocks.
Definition: locking.py:1
Raised when delivering data, but no location is provided.
Definition: datastore.py:82
A subclass of Product that represents file delivery.
Definition: datastore.py:856
def getcategory(self)
Returns the product category part of the database ID.
Definition: datastore.py:520
def wait_for_products
Waits for products to be available and performs an action on them.
Definition: datastore.py:979
Sets up signal handlers to ensure a clean exit.
Definition: sigsafety.py:1
def remove_file
Deletes the specified file.
Definition: fileop.py:251
db
The underlying sqlite3 database object.
Definition: datastore.py:170
def validate(self)
Validates this object's Datastore, prodname and category.
Definition: datastore.py:843
def jlogfile(self)
returns the jlogfile logger.
Definition: datastore.py:1075
def __setitem__(self, k, v)
Sets the value of the specified metadata key.
Definition: datastore.py:649
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
def check_file
Determines whether the specified file exists, and meets additional requirements.
Definition: fileop.py:982
def deliver
Delivers the file to a destination.
Definition: datastore.py:875
def getdatastore(self)
Returns the datastore of this datum.
Definition: datastore.py:504
state
Read-write property: the job state.
Definition: datastore.py:1114
def update(self)
Discards all cached metadata and refreshes it from the Datastore.
Definition: datastore.py:614
def deliver
Raises InvalidOperation.
Definition: datastore.py:969
def getlocation(self)
Returns the "location" field of this Datum's database entry.
Definition: datastore.py:524
def __init__
Datastore constructor.
Definition: datastore.py:146
Represents a process or actor that makes a Product.
Definition: datastore.py:1052
def log(self)
Returns the logger object for this task.
Definition: datastore.py:1149
def init_datum
Add a Datum to the database if it is not there already.
Definition: datastore.py:327
def __enter__(self)
Locks the database for the current thread, if it isn't already locked.
Definition: datastore.py:294
owninfo
implementation-defined information about the owner
Definition: datastore.py:40
def __exit__(self, etype, evalue, traceback)
Releases this object's thread lock.
Definition: datastore.py:488
def getprodname(self)
Returns the product name part of the database ID.
Definition: datastore.py:517
def __cmp__(self, other)
Compares two Datums' prodnames and categories.
Definition: datastore.py:576
def __repr__(self)
Python code-like description of this Datum.
Definition: datastore.py:572
messagebase
The original message sent to the constructor before per-exception messages were appended.
Definition: datastore.py:701
A piece of data produced by a Task.
Definition: datastore.py:716
def add_callback
Adds a delivery callback function.
Definition: datastore.py:725
def completed(self)
Read-only property: is this task completed? Same as is_completed()
Definition: datastore.py:1189
def get
Alias for self.meta() Returns the value of the specified metadata key or returns default if it is uns...
Definition: datastore.py:637
ownseen
last time the owner checked in
Definition: datastore.py:41
location
Read-write property, an alias for getlocation() and setlocation().
Definition: datastore.py:563
def setstate(self, val)
Sets the state of this job.
Definition: datastore.py:1094
def del_meta(self, d, k)
Delete metadata from the database.
Definition: datastore.py:410
def getprodtype(self)
Returns the product type of this Datum.
Definition: datastore.py:510
def refresh_meta
Replace Datum metadata with database values, add new metadata to database.
Definition: datastore.py:368
def __init__(self, dstore, taskname, logger=None, kwargs)
Task constructor.
Definition: datastore.py:1060
ownlegacy
length of time the lock is valid
Definition: datastore.py:42
This is a fake exception used to get a stack trace.
Definition: datastore.py:20
def is_available(self)
Is the product available?
Definition: datastore.py:831
def transaction(self)
Starts a transaction on the database in the current thread.
Definition: datastore.py:225
def __init__(self, did, owner, owninfo, ownseen, ownlegacy, checktime)
DatumLockHeld constructor.
Definition: datastore.py:29
Raised when a Datum or subclass receives a prodname or category name that is invalid.
Definition: datastore.py:78
def strstate(self)
A string representation of the job state.
Definition: datastore.py:1118
Datastore transaction support.
Definition: datastore.py:270
def gettaskname(self)
Returns the task name part of the database ID, which is the same as the prodname. ...
Definition: datastore.py:1127
Raised when an invalid Datum operation is requested, such as delivering an UpstreamProduct.
Definition: datastore.py:80
def undeliver(self, kwargs)
"Undelivers" a product.
Definition: datastore.py:810
Stores information about Datum objects in a database.
Definition: datastore.py:134
def getid(self)
Returns the database ID of this datum.
Definition: datastore.py:501
Superclass of anything that can be stored in a Datastore.
Definition: datastore.py:426
available
Read-write property: is the product available?
Definition: datastore.py:840
def set_loc_avail(self, loc, avail)
Sets the location and availability of this Datum in a single transaction.
Definition: datastore.py:583
def _getcache
Requests or forces a cache update.
Definition: datastore.py:593
def deliver(self, kwargs)
Asks the Product to deliver itself.
Definition: datastore.py:798
def _connection(self)
Gets the current thread's database connection.
Definition: datastore.py:189
filename
The path to the sqlite3 database file.
Definition: datastore.py:169
Configures logging.
Definition: log.py:1
def getstate(self)
Returns the job state.
Definition: datastore.py:1103
def __enter__(self)
Acquires this object's thread lock.
Definition: datastore.py:484
did
Read-only property, an alias for getid().
Definition: datastore.py:551
Raised when a LockDatum is held by another Worker.
Definition: datastore.py:27
Automates locking of a lockfile.
Definition: locking.py:66
def runpart(self)
Run some of this task's work, deliver some products.
Definition: datastore.py:1193
def query
Performs an SQL query returning the result of cursor.fetchall()
Definition: datastore.py:314
def is_completed(self)
Is this task complete?
Definition: datastore.py:1180
def iteritems(self)
Iterates over all metadata (key,value) pairs for this Datum, including "available" and "location"...
Definition: datastore.py:677
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
Definition: datastore.py:1084
def products(self, args, kwargs)
Iterate over the products this task produces.
Definition: datastore.py:1136
owner
The owner of the lock.
Definition: datastore.py:39
def call_callbacks
Calls all delivery callback functions.
Definition: datastore.py:759
did
The database ID of the datum whose lock is held.
Definition: datastore.py:38
def has_callbacks(self)
Returns True if this Product has any callback functions and False otherwise.
Definition: datastore.py:754
def __init__(self, ds)
Transaction constructor.
Definition: datastore.py:287
def unrun(self)
Undoes the effect of run().
Definition: datastore.py:1159
def __contains__(self, k)
Determines if a metadata key is set.
Definition: datastore.py:667
def undeliver(self)
Undelivering an UpstreamFile merely sets the internal "available" flag to False.
Definition: datastore.py:965
def __str__(self)
Human-readable representation of this exception.
Definition: datastore.py:61
def setlocation(self, v)
Sets the "location" field of this Datum's database entry.
Definition: datastore.py:527
def undeliver
Undoes the effect of deliver()
Definition: datastore.py:863
def check(self, kwargs)
Asks the product to check its own availability and update the database.
Definition: datastore.py:786
def set_meta(self, d, k, v)
Sets metadata key k to value v for the given Datum.
Definition: datastore.py:395
def __hash__(self)
Integer hash function.
Definition: datastore.py:566
def __str__(self)
Human-readable description of this Datum.
Definition: datastore.py:569
def update_datum(self, d)
Update database availability and location records.
Definition: datastore.py:357
def __getitem__(self, k)
Returns the value of the specified metadata key or raises KeyError.
Definition: datastore.py:619
def setavailable(self, val)
Sets the availability to the specified value.
Definition: datastore.py:823
def run(self)
Performs the work this Task should do and generates all products.
Definition: datastore.py:1171
def mutate
Performs an SQL database modification, returning the result of cursor.lastrowid.
Definition: datastore.py:320
def check
Checks the specified file to see if it is available.
Definition: datastore.py:924
Represents a Product created by an external workflow.
Definition: datastore.py:915