1 """!Stores products and tasks in an sqlite3 database file.
3 This module maintains an sqlite3 database file that stores information
4 about Products and Tasks. A Product is a file or group of files
5 created by some Task. Both Product and Task classes derive from
6 Datum, which is the base class of anything that can be stored in the
9 import sqlite3, threading, collections, re, contextlib, time, random,\
10 traceback, datetime, logging, os, time
15 __all__=[
'DatumException',
'DatumLockHeld',
'InvalidID',
'InvalidOperation',
16 'UnknownLocation',
'FAILED',
'UNSTARTED',
'RUNNING',
'PARTIAL',
17 'COMPLETED',
'Datastore',
'Transaction',
'Datum',
'CallbackExceptions',
21 """!This is a fake exception used to get a stack trace. It will
22 never be raised outside this module."""
25 """!Superclass of all exceptions local to produtil.datastore."""
28 """!Raised when a LockDatum is held by another Worker."""
29 def __init__(self,did,owner,owninfo,ownseen,ownlegacy,checktime):
30 """!DatumLockHeld constructor.
32 @param did the database ID of the datum whose lock is held
33 @param owner the owner of the lock
34 @param owninfo implementation-defined information about the owner
35 @param ownseen last time the owner checked in
36 @param ownlegacy length of time the lock is valid
37 @param checktime suggestion of how often to check the lock"""
62 """!Human-readable representation of this exception."""
63 return '''Cannot lock Datum %s at %d (%s)
64 Was locked by worker %d (info %s)
68 datetime.datetime.utcfromtimestamp(self.
checktime).ctime(),
70 datetime.datetime.utcfromtimestamp(self.
ownseen).ctime(),
73 """!String representation of this object."""
74 return '%s(%s,%s,%s,%s,%s,%s)' % \
75 ( type(self).__name__, repr(self.
did), repr(self.
owner), repr(self.
owninfo),
79 """!Raised when a Datum or subclass receives a prodname or category name that is invalid."""
80 class InvalidOperation(DatumException):
81 """!Raised when an invalid Datum operation is requested, such as delivering an UpstreamProduct."""
83 """!Raised when delivering data, but no location is provided."""
87 _has_dcolon=re.compile(
'\A.*::.*\Z')
91 _has_dstar=re.compile(
'\A.*\*\*.*\Z')
95 TASK_CATEGORY=
'**task**'
102 """Constant for use in Task.state: indicates a run was attempted but
108 """Constant for use in Task.state: indicates a run was never
109 attempted, or a run was attempted but the task was cleaned up."""
114 """Constant for use in Task.state: indicates the Task is presently
123 """Constant for use in Task.state: indicates the Task was running but
124 exited prematurely. Practically speaking, there is no way to tell the
125 difference between RUNNING and PARTIAL since the job cannot ensure
126 that it resets the state before unexpectedly exiting."""
131 """Constant for use in Task.state: indicates the task ran to
132 completion successfully."""
135 """!Stores information about Datum objects in a database.
137 Stores and retrieves Datum objects from an sqlite3 database. Uses
138 file locking workarounds for bugs in RedHat Enterprise Linux's
139 sqlite3 library, which is compiled without proper locking. Has
140 support for caching, and multithreading. Each object in this
141 database has a type, a string location, an integer "available"
142 parameter, and an arbitrary list of (key,value) metadata pairs.
143 This object can safely be accessed by multiple threads in the
144 local process, and handles concurrency between processes via file
146 def __init__(self,filename,logger=None,locking=True):
147 """!Datastore constructor
149 Creates a Datastore for the specified sqlite3 file. Uses the
150 given logging.Logger object for logging messages. Set
151 locking=False to disable all file locking. That is generally
152 unwise, and should only be used when reading the database.
153 That functionality is supplied, and critical, for monitoring
154 another user's jobs. One cannot lock another user's file, so
155 the "no locking" option is the only way to analyze the other
157 @param filename the filename passed to sqlite3.connect
158 @param logger a logging.Logger to use for logging messages
159 @param locking should file locking be used? It is unwise to
160 turn off file locking.
161 @warning Setting locking=False will disable file locking at
162 both the Datastore level, and within sqlite3 itself. This
163 can lead to database corruption if two processes try to
164 write at the same time. This functionality is provided
165 for the rare situation where you are unable to write to
166 a database, such as when reading other users' sqlite3
175 lockfile=filename+
'.lock'
176 if logger
is not None:
177 logger.debug(
'Lockfile is %s for database %s'%(lockfile,filename))
179 lockfile,logger=logger,max_tries=300,sleep_time=0.1,first_warn=50)
189 def _connection(self):
190 """!Gets the current thread's database connection. Each thread
191 has its own connection."""
192 tid=threading.current_thread().ident
200 @contextlib.contextmanager
202 """!Gets the transaction stack for the current thread."""
203 tid=threading.current_thread().ident
207 """!Acquires the database lock for the current thread."""
209 self._db_lock.acquire()
211 self._file_lock.acquire()
213 self._db_lock.release()
216 """!Releases the database lock from the current thread. If the
217 current thread does not have the lock, the results are
220 self._file_lock.release()
221 self._db_lock.release()
226 """!Starts a transaction on the database in the current thread."""
228 def _createdb(self,con):
229 """!Creates the tables used by this Datastore.
231 Runs "CREATE TABLE" commands in the sqlite3 database to create
232 all tables needed by this class. This code must be executed
233 inside a transaction() and should only be executed on initial
234 opening of the file, in the Datastore constructor. It is safe
235 to run this command twice on the same file --- the "IF NOT
236 EXISTS" SQLite modifier is used to ensure the table will not
238 con.execute(
'''CREATE TABLE IF NOT EXISTS products ( id TEXT NOT NULL, available INTEGER DEFAULT 0, location TEXT DEFAULT "", type TEXT DEFAULT "Product", PRIMARY KEY(id))''')
239 con.execute(
'''CREATE TABLE IF NOT EXISTS metadata ( id TEXT NOT NULL, key TEXT NOT NULL, value TEXT, CONSTRAINT id_metakey PRIMARY KEY(id,key))''')
240 con.execute(
'''CREATE TABLE IF NOT EXISTS workers ( info TEXT NOT NULL, lastseen INTEGER NOT NULL)''')
242 """!Print database contents to the terminal.
244 This function is only meant for debugging. It dumps to the
245 terminal an arguably human-readable display of the complete
246 database state via the print command."""
248 products=t.query(
'SELECT id,available,location,type FROM products')
249 meta=t.query(
'SELECT id,key,value FROM metadata')
250 print 'TABLE products:'
251 taskmap={UNSTARTED:
'UNSTARTED',FAILED:
'FAILED',RUNNING:
'RUNNING',
252 PARTIAL:
'PARTIAL',COMPLETED:
'COMPLETED'}
254 (what,avail,loc,typ)=row
255 if typ==
'Task' and avail
in taskmap:
256 print "id=%s available=%s (%s) location=%s type=%s" % \
257 ( what,avail,taskmap[avail],loc,typ )
259 print "id=%s available=%s (%s) location=%s type=%s" % \
260 ( what,avail,repr(bool(avail)),loc,typ )
262 print "id=%s available=%s location=%s type=%s" % \
264 print 'TABLE metadata:'
266 print '%s[%s]=%s' % row
271 """!Datastore transaction support.
273 Implements sqlite3 transactions on a Datastore. A transaction is
274 a set of reads and updates that should either ALL be done, or NONE
275 at all. Transactions also speed up the script, sometimes by as
276 much of a factor of 300, by grouping I/O operations to the
277 database into one large chunk. However, one must be careful in
278 using them since it keeps the database locked for an extended
281 This class should not be used directly. Instead, one should do
282 this to a Datum (Task or Product) object:
284 with datum_object.transaction() as t:
285 ... do things to the datum object ...
286 transaction is now complete, database is updated."""
288 """!Transaction constructor.
290 Creates the Transaction object but does NOT initiate the
295 """!Locks the database for the current thread, if it isn't
297 with self.ds._mystack()
as s:
304 """!Releases the database lock if this is the last Transaction
305 released for the current thread.
306 @param etype,evalue Exception type and value, if any.
307 @param traceback Exception traceback, if any."""
308 with self.ds._mystack()
as s:
309 assert(s.pop()
is self)
312 self.ds._connection().commit()
315 """!Performs an SQL query returning the result of cursor.fetchall()
316 @param stmt the SQL query
317 @param subvals the substitution values"""
318 cursor=self.ds._connection().execute(stmt,subvals)
319 return cursor.fetchall()
321 """!Performs an SQL database modification, returning the result
323 @param stmt the SQL query
324 @param subvals the substitution values"""
325 cursor=self.ds._connection().execute(stmt,subvals)
326 return cursor.lastrowid
328 """!Add a Datum to the database if it is not there already.
330 Given a Datum, add the object to the database if it is not
331 there already and fill the object with data from the database.
333 @param meta If True, also initialize metadata."""
334 prodtype=type(d).__name__
335 av = d._meta[
'available']
if (
'available' in d._meta)
else 0
336 loc = d._meta[
'location']
if (
'location' in d._meta)
else ''
338 self.
mutate(
'INSERT OR IGNORE INTO products VALUES (?,?,?,?)',(d.did,av,loc,prodtype))
339 if loc
is not None and loc!=
'':
344 self.
query(
'SELECT id,location FROM products WHERE id = ?',(d.did,)):
346 if did==d.did
and (oloc
is None or oloc==
''):
347 self.
mutate(
'UPDATE products SET location=? WHERE id=?',
351 if meta
and d._meta
is not None and d._meta:
352 for k,v
in d._meta.iteritems():
353 if k!=
'location' and k!=
'available':
354 self.
mutate(
'INSERT OR IGNORE INTO metadata VALUES (?,?,?)',(d.did,k,v))
358 """!Update database availability and location records.
360 Given a Datum whose location and availability is set, update
361 that information in the database, adding the Datum if
363 @param d the Datum"""
364 loc=str(d._meta[
'location'])
365 av=int(d._meta[
'available'])
366 self.
mutate(
'INSERT OR REPLACE INTO products VALUES (?,?,?,?)',
367 (d.did,av,loc,type(d).__name__))
369 """!Replace Datum metadata with database values, add new metadata to database.
371 Given a Datum d, discards and replaces d._meta with the
372 current metadata, location and availability. Will raise an
373 exception if the product does not exist in the database.
375 @param or_add If True, then any metadata that does not exist in the
376 database is created from values in d."""
379 for (did,av,loc)
in \
380 self.
query(
'SELECT id, available, location FROM products WHERE id = ?',(d.did,)):
392 for (did,k,v)
in self.
query(
'SELECT id, key, value FROM metadata WHERE id = ?',(d.did,)):
396 """!Sets metadata key k to value v for the given Datum.
398 Modifies the database entries for key k and datum d to have
399 the value v. If k is location or available, then the product
400 table will be updated instead.
402 @param k The metadata key.
403 @param v The value, a string."""
405 self.
mutate(
'UPDATE OR IGNORE products SET location = ? WHERE id = ?',(v,d.did))
407 self.
mutate(
'UPDATE OR IGNORE products SET available = ? WHERE id = ?',(int(v),d.did))
409 self.
mutate(
'INSERT OR REPLACE INTO metadata VALUES (?,?,?)',(d.did,k,v))
411 """!Delete metadata from the database.
413 Deletes the specified metadata key k, which must not be
414 "location" or "available".
415 @param d The Datum whose metadata is being deleted.
416 @param k The metadata key, which cannot be "available" or "location"."""
417 assert(k !=
'available' and k !=
'location')
418 self.
mutate(
'DELETE FROM metadata WHERE id=? AND key=?',(d.did,k))
427 """!Superclass of anything that can be stored in a Datastore.
429 This is the superclass of anything that can be placed in a
430 datastore. It has a category, a product name (prodname for
431 short), a location, availability (an int) and arbitrary metadata
432 (key,value) pairs. It caches database metadata in self._meta,
433 which is directly accessed by the Datastore class. Cache data
434 will be discarded once its age is older than self._cacheage."""
435 def __init__(self,dstore,prodname,category,meta=None,cache=30,location=None,**kwargs):
436 """!Datum constructor.
438 Creates a Datum in the given Datastore dstore, under the
439 specified category and product name prodname. The datastore
440 id used is "category::prodname". The value for "cache" is the
441 number of seconds to retain cached metadata before going back
442 to disk to reread it. That only applies to data "get"
443 operations: setting a data or metadata value will cause an
444 immediate write to the database. Also, __contains__ ("var" in
445 self) will force a fetch from the database if the requested
446 metadata variable is not in the cached copy of the database.
448 Values for location and meta are only the DEFAULT values, and
449 will be ignored if other values are already set in the
450 database. The location is only used if the product is not
451 already in the database: its location will be set to the
452 provided values upon entry. Similarly, the metadata is only
453 set in this call if there isn't already metadata for the
454 product with the given metadata keys.
455 @param dstore The Datastore for this Datum.
456 @param prodname The product name portion of the Datum ID
457 @param category The category part of the Datum ID
458 @param meta A dict of metadata values.
459 @param cache Metadata cache lifetime in seconds.
460 @param location The initial value for location, if it is not set already in the database.
461 @param kwargs Ignored."""
464 (self._dstore,self._prodname,self._category) = (dstore,str(prodname),str(category))
465 self.
_id=self._category+
'::'+self._prodname
474 self.
_meta=dict(meta)
475 if location
is not None:
476 self.
_meta[
'location']=str(location)
477 if 'available' in self.
_meta:
478 self.
_meta[
'available']=int(self.
_meta[
'available'])
479 self.
_lock=threading.RLock()
480 with self._dstore.transaction()
as t:
485 """!Acquires this object's thread lock. This is used to manage cached data."""
489 """!Releases this object's thread lock. This is used to manage cached data.
490 @param etype,evalue,traceback Exception information."""
494 """!Validates this object's Datastore, prodname and category."""
495 if _has_dcolon.match(self._prodname):
496 raise InvalidID(
'%s: the prodname cannot contain a double colon (::)'%(self.
_id))
497 if _has_dcolon.match(self._category):
498 raise InvalidID(
'%s: the category cannot contain a double colon (::)'%(self.
_id))
502 """!Returns the database ID of this datum."""
505 """!Returns the datastore of this datum."""
508 """!Creates, but does not lock, a Transaction for this datum's datastore."""
509 return self._dstore.transaction()
511 """!Returns the product type of this Datum.
513 Returns the product type of this Datum. This is generally the
514 name of the Python class that created the entry in the
516 return type(self).__name__
518 """!Returns the product name part of the database ID."""
519 return self._prodname
521 """!Returns the product category part of the database ID."""
522 return self._category
525 """!Returns the "location" field of this Datum's database entry."""
526 return self[
'location']
528 """!Sets the "location" field of this Datum's database entry.
529 @param v the new location"""
535 prodname=property(getprodname,
None,
None,
"""the product name (read-only)""")
540 category=property(getcategory,
None,
None,
"""the category (read-only)""")
546 prodtype=property(getprodtype,
None,
None,
547 """Returns the prodtype for this Datum: its class name (read-only)""")
551 did=property(getid,
None,
None,
552 """Returns the database id for this Datum (read-only)""")
557 dstore=property(getdatastore,
None,
None,
558 """Gets the Datastore object that contains this Datum (read-only)""")
563 location=property(getlocation,setlocation,
None,
564 """The location of this product (read/write)""")
567 """!Integer hash function."""
568 return hash(self._prodname)^hash(self._category)
570 """!Human-readable description of this Datum."""
573 """!Python code-like description of this Datum."""
574 return '%s(%s,%s,%s)' % \
575 (self.
prodtype,repr(self.
dstore),repr(self._prodname),repr(self._category))
577 """!Compares two Datums' prodnames and categories.
578 @param other the other datum to compare against"""
579 if not isinstance(other,Datum):
return NotImplemented
580 c=cmp(self._prodname,other._prodname)
581 c=cmp(self._category,other._category)
if (c==0)
else c
584 """!Sets the location and availability of this Datum in a
586 @param loc the new location, a string
587 @param avail the new availability, an int"""
589 self.
_meta[
'location']=str(loc)
590 self.
_meta[
'available']=int(avail)
593 def _getcache(self,k=None,force=False):
594 """!Requests or forces a cache update.
595 This is the implementation of metadata/location/available
596 caching. It returns self._meta if the cache has not aged out
597 (and k, if provided, is in self._meta) or goes to the
598 Datastore to update the cache, and then returns the resulting
599 self._meta. This MUST be called from within a "with self".
600 @param k The key of interest.
601 @param force If True, forces a cache update even if the
602 cache is not expired."""
603 logger=self.dstore._logger
608 if k
is None or k
in self.
_meta:
615 """!Discards all cached metadata and refreshes it from the
620 """!Returns the value of the specified metadata key or raises
621 KeyError. Forces a cache update if k is not in the cache."""
626 """!Return the value of a metadata key
628 Returns the value of the specified metadata key or returns
629 default if it is unset. Does NOT force a cache update if k is
630 missing from the cache. To force a cache update, use
632 @param k The key of interest.
633 @param default The value to return if no value is seen.
634 @returns The metadata value or the default."""
637 def get(self,k,default=None):
638 """!Alias for self.meta()
639 Returns the value of the specified metadata key or returns
640 default if it is unset. Does NOT force a cache update if k is
641 missing from the cache. To force a cache update, use
643 @param k The key of interest.
644 @param default The value to return if no value is seen.
645 @returns The metadata value or the default."""
650 """!Sets the value of the specified metadata key.
652 @param v the value"""
658 """!Deletes the specified metadata key, which must not be
659 "available" or "location".
660 @param k the key to delete"""
661 assert(k !=
'available' and k !=
'location')
668 """!Determines if a metadata key is set.
669 @returns True if the specified metadata key is set, and False
670 otherwise. Immediately returns True for 'available' and
671 'location' without checking the metadata cache.
672 @param k the key of interest"""
673 if k==
'available' or k==
'location':
678 """!Iterates over all metadata (key,value) pairs for this
679 Datum, including "available" and "location"."""
682 assert(
'available' in meta)
683 assert(
'location' in meta)
684 yield 'available',meta[
'available']
685 yield 'location',meta[
'location']
686 for k,v
in meta.iteritems():
687 if k!=
'location' and k!=
'available':
693 """!Exception raised when a Product class encounters
694 exceptions while calling its callback functions in
695 Product.call_callbacks."""
697 """!CallbackExceptions constructor.
698 @param message The beginning of the exception message. Each
699 exception's message will be appended to this.
700 @param exlist The list of exceptions."""
704 message+=
"\n %s"%(str(ex),)
705 Exception.__init__(self,message,exlist)
717 """!A piece of data produced by a Task.
719 A Product is a piece of data that can be produced by a Task. Once
720 the product is available, self.available or self.is_available()
721 will be True, and the self.location will be valid. The meaning of
722 self.location is up to the implementer to decide, but it should be
723 a full path to a location on disk for file products. As with all
724 Datum objects, a Product also has arbitrary metadata."""
726 """!Adds a delivery callback function.
728 Adds a delivery callback that is called when the product is
729 delivered. This is intended to do such tasks as running an
730 NCO dbn_alert, or copying to a website, or emailing someone.
731 This function is only added in this local Python instance, not
732 in the database file. Also, it is the responsibility of the
733 subclasses to call self.call_callbacks() from self.deliver()
734 to ensure the callbacks are run.
738 def callback(name,*args,**kwargs):
739 print "My fancy product %s was delivered!"%(name,)
740 product.add_callback(callback,[product.prodname])
743 @param callback The callback function, which must be able to
744 take any keyword or indexed arguments.
745 @param args The indexed arguments to send.
746 @param states Presently unused."""
751 calldata=[callback,largs]
752 if '_callbacks' not in self.__dict__: setattr(self,
'_callbacks',list())
753 self._callbacks.append(calldata)
755 """!Returns True if this Product has any callback functions
756 and False otherwise"""
757 if '_callbacks' not in self.__dict__:
return False
758 return len(self._callbacks)>0
760 """!Calls all delivery callback functions.
762 Calls all data delivery callbacks for this Product. Collects
763 any raised Exception subclasses until after all callbacks are
764 called. Will raise CallbackExceptions if any exceptions are
767 Subclasses should call this from either check, or deliver, as
768 appropriate for the product type.
769 @param logger Optional: the logging.Logger for logging messages."""
770 if '_callbacks' not in self.__dict__:
return
771 if not self._callbacks:
return
772 if logger
is None and len(self._callbacks)>0:
773 logger=logging.getLogger(
'produtil.datastore')
776 for (callback,args)
in self._callbacks:
778 callback(*args,**meta)
779 except Exception
as e:
780 if exlist
is None: exlist=list()
782 if logger
is not None:
783 logger.warning(str(e),exc_info=
True)
784 if exlist
is not None:
787 """!Asks the product to check its own availability and update
790 Checks to see if this product is available. This is generally
791 not a cheap operation, as it can take seconds or minutes and
792 may fail. One should call "available" instead if cached
793 information is sufficient.
794 @param kwargs Additional keyword arguments are unused. This is
795 for use by subclasses."""
799 """!Asks the Product to deliver itself.
801 Delivers a product to its destination. This is not
802 implemented by the base class. Note that this is generally an
803 expensive operation which may take seconds or minutes, and may
804 fail. It may involve copying many files, network access, or
805 even pulling tapes from a silo. In the end, the location and
806 availability are expected to be updated in the database.
807 @param kwargs Unused, to be used by subclasses.
808 @post available=True and location is non-empty."""
811 """!"Undelivers" a product.
813 The meaning of this function is implementation-dependent: it
814 could mean deleting an output file, or any number of other
815 actions. Regardless, it should result in self.available=False
816 or an exception being thrown. Note that this is generally an
817 expensive operation that could take seconds or minutes, and
818 may fail. The default implementation simply sets available to
821 @post available=False"""
824 """!Sets the availability to the specified value.
826 Sets the "available" attribute of this Product in the database
827 after converting the given value to a bool and then int
829 @param val the new availability"""
830 self[
'available']=int(bool(val))
832 """!Is the product available?
834 Returns the "available" attribute of this Product in the
835 database, converted to a boolean value via bool()"""
836 return bool(int(self[
'available']))
840 available=property(is_available,setavailable,
None,
841 """The availability of this product as a bool (read/write)""")
844 """!Validates this object's Datastore, prodname and category.
846 Validates the Datastore, prodname and category of this
847 Product. In addition to the requirements made by Datum, this
848 function requires that the category not contain any double
850 if _has_dstar.match(self._category):
851 raise InvalidID(
'%s: Product categories cannot contain double stars (**)'%(self.
_id))
857 """!A subclass of Product that represents file delivery.
859 This subclass of Product represents a file that is delivered by
860 this workflow. The deliver() subroutine actually copies the file,
861 and undeliver() deletes it. The produtil.fileop.remove_file() and
862 produtil.fileop.deliver_file() are used for this purpose."""
864 """!Undoes the effect of deliver()
866 Sets this Product's available attribute to False. If
867 delete=True, will also delete the specified file.
868 @param delete if True, the file is deleted
869 @param logger a logging.Logger for log messages"""
874 def deliver(self,location=None,frominfo=None,keep=True,logger=None,
876 """!Delivers the file to a destination.
878 Delivers the file to a destination location specified. The
879 origin is in the "frominfo" argument. Delivery is done by
880 produtil.fileop.deliver_file. The keep, copier and logger
881 arguments are passed on unmodified.
882 @param location The new location.
883 @param frominfo Where to get the file from.
884 @param keep If True, the original file is always kept. If False,
885 the original file may be moved to the destination instead of copied.
886 @param logger a logging.Logger for log messages
888 @param copier Passed to the copier argument of
889 produtil.fileop.deliver_file()
890 @post The file is at the location specified, and the database
891 location and availability are updated accordingly."""
892 if not isinstance(frominfo,basestring):
893 raise TypeError(
'FileProduct.deliver requires a string filename as its frominfo argument. You provided an object of type %s.'%(type(frominfo).__name__))
894 if location
is not None and not isinstance(location,basestring):
895 raise TypeError(
'FileProduct.deliver requires a location argument that is None or a string filename. You provided an object of type %s.'%(type(frominfo).__name__))
903 '%s: no location known when delivering product. Specify a '
904 'location to deliver().'%(self.
did))
916 """!Represents a Product created by an external workflow.
918 This subclass of Product represents a file that is created by an
919 external workflow. It implements a check() call that determines
920 if the file is larger than a minimum size and older than a minimum
921 age. Once the file is large enough and old enough, it sets the
922 location and availability. Any calls to undeliver() or deliver()
923 raise InvalidOperation."""
924 def check(self,frominfo=None,minsize=None,minage=None,logger=None):
925 """!Checks the specified file to see if it is available.
927 Looks for the file on disk. Updates the "available" and
928 "location" attributes of this Product. Uses two metadata
929 values to decide whether the file is "available" if it exists:
931 self["minsize"] - minimum size in bytes. Default: 0
932 self["minage"] - minimum age in seconds. Default: 20
934 Both can be overridden by corresponding arguments. Note that
935 one must be careful with the minimum age on poorly-maintained
936 clusters due to clock skews.
937 @param frominfo Optional: where to look for the file instead of
938 looking at self.location
939 @param minsize Optional: the minimum file size in bytes
940 @param minage Optional: the minimum file modification time in seconds.
941 @param logger Optional: a logging.Logger for log messages."""
947 elif not isinstance(loc,basestring):
948 raise TypeError(
'UpstreamFile.check requires a frominfo argument that is either None or a string filename. You provided an object of type %s.'%(type(frominfo).__name__))
949 assert(loc
is not None)
951 minsize=int(self.
get(
'minsize',0))
953 minage=int(self.
get(
'minage',20))
955 min_mtime_age=minage):
966 """!Undelivering an UpstreamFile merely sets the internal
967 "available" flag to False. It does not remove the data."""
969 def deliver(self,location=None,frominfo=None):
970 """!Raises InvalidOperation. You cannot deliver an upstream
971 file. The upstream workflow must do that for you. Call
972 check() instead to see if the file has been delivered."""
973 raise InvalidOperation(
'Internal error: the scripts attempted to deliver an upstream product.')
978 renamer_args=
None,action_args=
None,sleeptime=20,
980 """!Waits for products to be available and performs an action on them.
982 Waits for a specified list of products to be available, and
983 performs some action on each product when it becomes available.
984 Sleeps sleeptime seconds between checks. Returns the number of
985 products that were found before the maxtime was reached.
987 @param plist A Product or a list of Product objects.
988 @param logger A logging.Logger object in which to log messages.
989 @param renamer Optional: a function or callable object that
990 provides a new name for each product. This is passed the
991 product, the logger and the contents of *renamer_args.
992 Default: os.path.basename(p.location)
993 @param action Optional: an action to perform on each product.
994 This is passed the product, the output of renamer, the logger
995 and the contents of *action_args. Default: perform no action.
996 @param renamer_args Optional: arguments to renamer.
997 @param action_args Optional: arguments to action.
998 @param sleeptime - after checking availability of all products, if
999 at least one is unavailable, the code will sleep for this much
1000 time before rechecking. Will be overridden by 0.01 if it is
1001 set to something lower than that. Default: 20
1002 @param maxtime - maximum amount of time to spend in this routine
1004 @returns the number of products that became available before the
1005 maximum wait time was hit. """
1007 renamer=
lambda p,l: os.path.basename(p.location)
1008 if isinstance(plist,Product):
1010 if not ( isinstance(plist,tuple)
or isinstance(plist,list) ):
1011 raise TypeError(
'In wait_for_products, plist must be a '
1012 'list or tuple, not a '+type(plist).__name__)
1013 now=int(time.time())
1017 if not isinstance(p,Product):
1018 raise TypeError(
'In wait_for_products, plist must only '
1019 'contain Product objects.')
1020 if renamer_args
is None: renamer_args=list()
1021 if action_args
is None: action_args=list()
1022 logger.info(
'Waiting for %d products.'%(int(len(plist)),))
1023 while len(seen)<len(plist)
and now<start+maxtime:
1024 now=int(time.time())
1026 if p
in seen:
continue
1027 if not p.available: p.check()
1029 logger.info(
'Product %s is available at location %s'
1030 %(repr(p.did),repr(p.location)))
1032 if action
is not None:
1033 name=renamer(p,logger,*renamer_args)
1034 action(p,name,logger,*action_args)
1037 'Product %s not available (available=%s location=%s).'
1038 %(repr(p.did),repr(p.available),repr(p.location)))
1039 now=int(time.time())
1040 if now<start+maxtime
and len(seen)<len(plist):
1041 sleepnow=max(0.01,min(sleeptime,start+maxtime-now-1))
1042 logfun=logger.info
if (sleepnow>=5)
else logger.debug
1043 logfun(
'Sleeping %g seconds...'%(float(sleepnow),))
1044 time.sleep(sleepnow)
1045 logfun(
'Done sleeping.')
1046 logger.info(
'Done waiting for products: found %d of %d products.'
1047 %(int(len(seen)),int(len(plist))))
1053 """!Represents a process or actor that makes a Product.
1055 A Task represents some work that needs to be done to produce
1056 Products. A task has a state (stored in the "available" metadata
1057 attribute), a location, whose meaning is up to the implementer to
1058 decide, and a logger.Logger. As with all Datum subclasses, a Task
1059 also has arbitrary metadata."""
1060 def __init__(self,dstore,taskname,logger=None,**kwargs):
1061 """!Task constructor
1063 Creates a new Task from the given dataset and with the given
1065 @param dstore the Datastore where this Task will live
1066 @param taskname the task name, passed to the Datum as prodname
1067 @param logger a logging.Logger for this task to use for log messages
1068 @param kwargs other keyword arguments are passed to Datum.__init__"""
1070 logger=logging.getLogger(taskname)
1072 Datum.__init__(self,dstore=dstore,prodname=taskname,category=TASK_CATEGORY,**kwargs)
1076 """!returns the jlogfile logger.
1078 Returns a logging.Logger for the jlogfile. The jlogfile is
1079 intended to receive only major errors, and per-job start and
1080 completion information. This is equivalent to simply
1081 accessing produtil.log.jlogger."""
1082 return produtil.log.jlogger
1085 """!same as produtil.log.jlogger.info()
1087 Sends a message to the multi-job shared log file at level
1089 @param message the message
1090 @param args positional arguments for string replacement
1091 @param kwargs keyword arguments for string replacement."""
1092 produtil.log.jlogger.info(message,*args,**kwargs)
1095 """!Sets the state of this job.
1097 Sets the job stat to the specified value. This works by
1098 setting the "available" attribute to the specified integer.
1099 For compatibility with other scripts, this should be FAILED,
1100 UNSTARTED, RUNNING, PARTIAL or COMPLETED.
1101 @param val the new job state, an int"""
1102 self[
'available']=int(val)
1104 """!Returns the job state.
1106 Returns the "available" attribute as an integer. This is used
1107 as the state of the Task. Typically, the return value should
1108 be one of: FAILED, UNSTARTED, RUNNING, PARTIAL, or COMPLETED."""
1109 return int(self[
'available'])
1114 state=property(getstate,setstate,
None,
1115 """The state of this Task as an int (read/write)""")
1119 """!A string representation of the job state."""
1120 s=int(self[
'available'])
1121 if s==FAILED:
return 'FAILED'
1122 elif s==UNSTARTED:
return 'UNSTARTED'
1123 elif s==RUNNING:
return 'RUNNING'
1124 elif s==PARTIAL:
return 'PARTIAL'
1125 elif s==COMPLETED:
return 'COMPLETED'
1126 else:
return 'UNRECOGNIZED(%d)'%s
1128 """!Returns the task name part of the database ID, which is the
1129 same as the prodname."""
1130 return self._prodname
1134 taskname=property(gettaskname,
None,
None,
1135 """!The task name (read-only, same as prodname)""")
1137 """!Iterate over the products this task produces.
1139 Iterates over some or all of the products produced by this
1140 task. The arguments are used to select subsets of the total
1141 set of products. Provide no arguments to get the full list of
1142 products. All subclasses should re-implement this method, and
1143 interpret the arguments in a way that makes sense to that
1144 class. The default implementation returns immediately without
1146 @param args,kwargs Implementation-defined, used by subclasses."""
1148 for x
in []:
yield x
1150 """!Returns the logger object for this task."""
1153 """!Cleans up any unneeded data used by this task.
1155 Subclasses should override this function to clean up any
1156 unneeded temporary files or other unused resources consumed by
1157 the run() function. This default implementation does nothing."""
1160 """!Undoes the effect of run().
1162 Cleans up this Task's work areas, "undelivers" all
1163 deliverables, and makes it look like the task has never been
1164 run. All subclasses should re-implement this method, and must
1165 also "unrun" everything their parent class runs. The default
1166 implementation simply calls self.clean() and sets the state to
1168 @post self.state=UNSTARTED"""
1170 self.state=UNSTARTED
1172 """!Performs the work this Task should do and generates all products.
1174 Performs the work that this task is supposed to do. All
1175 subclasses should re-implement this method, and should set the
1176 state to COMPLETED the end. This implementation simply calls
1177 self.setstate(COMPLETED)
1178 @post self.state=COMPLETED"""
1181 """!Is this task complete?
1183 Returns True if this task's state is COMPLETED, and False
1185 return self.
state==COMPLETED
1190 """!True if self.state==COMPLETED, False otherwise."""
1191 return self.
state==COMPLETED
1194 """!Run some of this task's work, deliver some products.
1196 Performs a subset of the work that this task is supposed to do
1197 and returns. This is intended to be used for tasks that can
1198 be broken up into small pieces, such as post-processing all
1199 output files from a NWP simulation one by one. The default
1200 implementation simply calls self.run()"""
def __delitem__(self, k)
Deletes the specified metadata key, which must not be "available" or "location".
This module provides a set of utility functions to do filesystem operations.
def dump(self)
Print database contents to the terminal.
def __init__(self, message, exlist)
CallbackExceptions constructor.
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
def __init__(self, dstore, prodname, category, meta=None, cache=30, location=None, kwargs)
Datum constructor.
def __exit__(self, etype, evalue, traceback)
Releases the database lock if this is the last Transaction released for the current thread...
checktime
suggestion of how often to check the lock
prodtype
Read-only property, an alias for getprodtype(), the product type.
def __repr__(self)
String representation of this object.
def transaction(self)
Creates, but does not lock, a Transaction for this datum's datastore.
def clean(self)
Cleans up any unneeded data used by this task.
exlist
The list of exceptions raised.
def meta
Return the value of a metadata key.
def _createdb(self, con)
Creates the tables used by this Datastore.
def validate(self)
Validates this object's Datastore, prodname and category.
Superclass of all exceptions local to produtil.datastore.
Exception raised when a Product class encounters exceptions while calling its callback functions in P...
Handles file locking using Python "with" blocks.
Raised when delivering data, but no location is provided.
A subclass of Product that represents file delivery.
def getcategory(self)
Returns the product category part of the database ID.
def wait_for_products
Waits for products to be available and performs an action on them.
Sets up signal handlers to ensure a clean exit.
def remove_file
Deletes the specified file.
db
The underlying sqlite3 database object.
def validate(self)
Validates this object's Datastore, prodname and category.
def jlogfile(self)
returns the jlogfile logger.
def __setitem__(self, k, v)
Sets the value of the specified metadata key.
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
def check_file
Determines whether the specified file exists, and meets additional requirements.
def deliver
Delivers the file to a destination.
def getdatastore(self)
Returns the datastore of this datum.
state
Read-write property: the job state.
def update(self)
Discards all cached metadata and refreshes it from the Datastore.
def deliver
Raises InvalidOperation.
def getlocation(self)
Returns the "location" field of this Datum's database entry.
def __init__
Datastore constructor.
Represents a process or actor that makes a Product.
def log(self)
Returns the logger object for this task.
def init_datum
Add a Datum to the database if it is not there already.
def __enter__(self)
Locks the database for the current thread, if it isn't already locked.
owninfo
implementation-defined information about the owner
def __exit__(self, etype, evalue, traceback)
Releases this object's thread lock.
def getprodname(self)
Returns the product name part of the database ID.
def __cmp__(self, other)
Compares two Datums' prodnames and categories.
def __repr__(self)
Python code-like description of this Datum.
messagebase
The original message sent to the constructor before per-exception messages were appended.
A piece of data produced by a Task.
def add_callback
Adds a delivery callback function.
def completed(self)
Read-only property: is this task completed? Same as is_completed()
def get
Alias for self.meta() Returns the value of the specified metadata key or returns default if it is uns...
ownseen
last time the owner checked in
location
Read-write property, an alias for getlocation() and setlocation().
def setstate(self, val)
Sets the state of this job.
def del_meta(self, d, k)
Delete metadata from the database.
def getprodtype(self)
Returns the product type of this Datum.
def refresh_meta
Replace Datum metadata with database values, add new metadata to database.
def __init__(self, dstore, taskname, logger=None, kwargs)
Task constructor.
ownlegacy
length of time the lock is valid
This is a fake exception used to get a stack trace.
def is_available(self)
Is the product available?
def transaction(self)
Starts a transaction on the database in the current thread.
def __init__(self, did, owner, owninfo, ownseen, ownlegacy, checktime)
DatumLockHeld constructor.
Raised when a Datum or subclass receives a prodname or category name that is invalid.
def strstate(self)
A string representation of the job state.
Datastore transaction support.
def gettaskname(self)
Returns the task name part of the database ID, which is the same as the prodname. ...
Raised when an invalid Datum operation is requested, such as delivering an UpstreamProduct.
def undeliver(self, kwargs)
"Undelivers" a product.
Stores information about Datum objects in a database.
def getid(self)
Returns the database ID of this datum.
Superclass of anything that can be stored in a Datastore.
available
Read-write property: is the product available?
def set_loc_avail(self, loc, avail)
Sets the location and availability of this Datum in a single transaction.
def _getcache
Requests or forces a cache update.
def deliver(self, kwargs)
Asks the Product to deliver itself.
def _connection(self)
Gets the current thread's database connection.
filename
The path to the sqlite3 database file.
def getstate(self)
Returns the job state.
def __enter__(self)
Acquires this object's thread lock.
did
Read-only property, an alias for getid().
Raised when a LockDatum is held by another Worker.
Automates locking of a lockfile.
def runpart(self)
Run some of this task's work, deliver some products.
def query
Performs an SQL query returning the result of cursor.fetchall()
def is_completed(self)
Is this task complete?
def iteritems(self)
Iterates over all metadata (key,value) pairs for this Datum, including "available" and "location"...
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
def products(self, args, kwargs)
Iterate over the products this task produces.
owner
The owner of the lock.
def call_callbacks
Calls all delivery callback functions.
did
The database ID of the datum whose lock is held.
def has_callbacks(self)
Returns True if this Product has any callback functions and False otherwise.
def __init__(self, ds)
Transaction constructor.
def unrun(self)
Undoes the effect of run().
def __contains__(self, k)
Determines if a metadata key is set.
def undeliver(self)
Undelivering an UpstreamFile merely sets the internal "available" flag to False.
def __str__(self)
Human-readable representation of this exception.
def setlocation(self, v)
Sets the "location" field of this Datum's database entry.
def undeliver
Undoes the effect of deliver()
def check(self, kwargs)
Asks the product to check its own availability and update the database.
def set_meta(self, d, k, v)
Sets metadata key k to value v for the given Datum.
def __hash__(self)
Integer hash function.
def __str__(self)
Human-readable description of this Datum.
def update_datum(self, d)
Update database availability and location records.
def __getitem__(self, k)
Returns the value of the specified metadata key or raises KeyError.
def setavailable(self, val)
Sets the availability to the specified value.
def run(self)
Performs the work this Task should do and generates all products.
def mutate
Performs an SQL database modification, returning the result of cursor.lastrowid.
def check
Checks the specified file to see if it is available.
Represents a Product created by an external workflow.