1 """!Declares GRIBTask, which automates regribbing operations.
3 This module contains the GRIBTask, which is an HWRFTask that runs the
4 regribbing described in an hwrf.regrib.RegribMany object."""
10 import os, os.path, time, socket, re, collections
17 from hwrf.regrib import GRIBBase, GRIBOp, RegribMany, GRIB2Op
29 """!Creates a collections.defaultdict that has a default value of
30 None for any key that has no value."""
31 return collections.defaultdict(nonemaker)
34 """!A Python "with" block construct that does nothing."""
36 """!Do nothing at entry to a "with" block."""
38 """!Do nothing upon exiting a "with" block.
39 @param etype,evalue,traceback exception information to ignore."""
44 SHOULD_BE_MADE=object()
47 """!An hwrf.hwrftask.HWRFTask that performs regribbing operations.
49 An HWRFTask that runs regribbing for a list of input/output times.
50 This class keeps track of many different grids and grib files,
51 each of which is given a name. The regribbing operations are
52 specified in an hwrf.regrib.RegribMany object. It is possible for
53 multiple jobs to run the same GRIBTask at the same time: the task
54 uses lock files and a produtil.datastore.Datastore to communicate
56 def __init__(self,dstore,conf,section,regribmany,start,end,step,
58 """!Creates a new GRIBTask:
59 @param dstore the produtil.datastore.Datastore to use
60 @param conf the HWRFConfig to use for configuration options.
61 This conf is passed down to the RegribMany during
62 regribbing operations.
63 @param section the config section to use.
64 @param regribmany the hwrf.regrib.RegribMany that describes
65 the regribbing operations
66 @param start,end,step the start and end times and timestep for
67 the list of times to regrib.
68 @param atime the analysis time of the simulation. This is needed
69 by some of the regribbing operations. Note that this
70 analysis time may differ from the conf.cycle, especially for
71 data assimilation first guess jobs.
72 @param kwargs passed to the HWRFTask constructor"""
74 assert(regribmany.has_deliveries())
75 super(GRIBTask,self).
__init__(dstore,conf,section,**kwargs)
77 self.
start=self._regribstore.start
78 self.
end=self._regribstore.end
86 for time
in self._subtasks.times():
88 taskname=
'%s.f%02dh%02dm' % ( self.
taskname, ihr, imin )
90 taskname=taskname,logger=logger)
95 regribmany.logger=self.
log()
114 """!Create a time-indexed array of None.
116 Creates a new hwrf.numerics.TimeArray that maps from list of
117 times to be processed to None."""
120 """!Create an array of times to be processed.
122 Returns a new hwrf.numerics.TimeArray for the list of times
123 to be processed. Unlike make_time_array, this TimeArray will
124 assume any times that don't have data assigned have an empty
128 def get_data(self,name,time=None,update=False,**kwargs):
129 """!Returns intermediate results.
131 Used by RegribMany to retrieve intermediate results. The
132 extra update flag is not used by RegribMany, but rather used
133 by GRIBTask.run() to force a recheck of the database to see if
134 another GRIBTask has completed a regribbing operation by
136 @param name the name of the result type
137 @param time the time at which the result is requested
138 @param update if True, call update() on the result
139 @param kwargs unused"""
140 assert(time
is not None)
141 if time
is None:
return None
145 if got
is SHOULD_BE_MADE:
147 if isinstance(got,Product):
149 if not av
and update:
152 if not av:
return None
153 strtime=time.strftime(
'%Y%m%d.%H%M%S')
154 self.
log().info(
'recalling result - %s %s'%(strtime,name))
156 def set_data(self,name,data,time=None,logger=None,**kwargs):
157 """!Stores intermediate results.
159 Used by RegribMany to store intermediate results. Will also
160 deliver any output products.
161 @param name the name of the result being set
162 @param data the data, any implementation-defined object. If
163 this is a product that is delivered, it is sent to the
164 frominfo= argument of Product.deliver()
165 @param time the time at which the result is set
166 @param logger a logging.Logger to use for logging messages
167 @param kwargs ignored"""
168 assert(time
is not None)
169 if logger
is None: logger=self.
log()
170 strtime=time.strftime(
'%Y%m%d.%H%M%S')
171 if time
is None:
return
172 logger.info(
'result obtained - %s %s'%(strtime,name))
178 'result obtained - %s %s: deliver (keep=%s) '
179 'destination = %s frominfo = %s'
180 %(name,strtime,repr(keep),repr(mydata.location),repr(data)))
181 mydata.deliver(frominfo=data,keep=keep)
184 for product
in products:
186 logger.warning(
'%s %s: also deliver to %s'%
187 (name,strtime,product.location))
188 product.deliver(frominfo=data)
190 logger.info(
'result obtained - %s %s: non-product '
191 'result.'%(strtime,name))
197 """!Creates Product objects.
199 Called automatically from run(), this creates Product objects
200 for all output products. Note that this is potentially an
201 expensive function if the gribtask has a large number of
204 deliveries=collections.defaultdict(list)
211 for name,op
in self._rm.GRIBOps():
213 if isinstance(op,GRIB2Op):
219 for name,op
in self._rm.nonGRIBOps():
224 for (where,name,loc,cat,prod,keep)
in self._rm.deliveries():
225 deliveries[name].append([where,loc,cat,prod,keep])
226 if name
not in regribset:
228 self._regribnames.append(name)
233 for time
in self._regribstore.times():
234 fcststr=time.strftime(
'%Y%m%d.%H%M%S')
236 if self._rm.input_valid(name,time=time,task=self):
237 logger.debug(
'%s: make non-file %s at this time'
241 if not self._rm.input_valid(name,time=time,task=self):
243 logger.debug(
'%s: make file %s at this time'%(fcststr,name))
249 if name
in deliveries:
250 dlist=deliveries[name]
251 if dlist
is None or len(dlist)!=1:
254 product=make_product(ptype,name,fcststr,time,
256 self._all_products.add(product)
258 if dlist
is not None:
259 for where,loc,cat,prod,keep
in dlist:
260 product=make_product(ptype,name,fcststr,time,
262 self._all_products.add(product)
266 (where,loc,cat,prod,keep)=dlist[0]
267 product=make_product(ptype,name,fcststr,time,
268 where=where,loc=loc,cat=cat,prod=prod)
269 self._all_products.add(product)
272 logger.debug(
'%s: final list contains: %s'%(fcststr,name))
274 def products(self,name=None,time=None,yieldtime=False):
275 """!Iterates over products:
277 Loops over all products that meet the specifications, yielding
279 @param name the name of the products (same name as in the
281 @param time the forecast time of the product
282 @param yieldtime if True, instead of iterating over products, this
283 will iterate over tuples containing the time and product."""
285 if yieldtime
or name
is not None:
286 neartime=self._regribstore.neartime(time)
290 if found
is None:
return
299 yield neartime,product
303 for rtime
in self._regribstore.datatimes():
320 def _make_product(self,cls,name,fcststr,time,where=None,loc=None,
322 """!implementation of make_products
324 The underlying implementation of make_products, this makes
326 @param cls the class to create. This is a constructor which will
329 cls(self.dstore,category=cat,prodname=prod,location=loc)
332 @param name the name of the product from the RegribMany
333 @param fcststr a string representation of the forecast time, used
334 for logging and product location generation
335 @param time the forecast time
336 @param where "com" or "intercom". Default: "intercom"
337 @param loc the product location. A reasonable default is chosen
338 if this is unspecified
339 @param cat the product category. Default: self.taskname
340 @param prod the product's prodname. Default: name."""
341 if where
is None: where=
'intercom'
347 assert(where==
'intercom')
348 loc=
'%s/%s.%s'%(self.
taskname,name,fcststr)
352 prod=
'%s/%s'%(where,loc)
355 loc=os.path.join(self.conf.getdir(where),loc)
356 assert(loc
is not None and loc!=
'')
357 assert(cat
is not None and cat!=
'')
358 assert(prod
is not None and prod!=
'')
359 return cls(self.
dstore,category=cat,prodname=prod,location=loc)
364 """!Marks all tasks as unstarted and products as unavailable.
366 Marks this task and all subtasks as incomplete, ensuring that
367 the next call to run or runpart will attempt to produce any
368 products that are not delivered. All products that are
369 "available" but are not on disk are marked as unavailable."""
371 with self.dstore.transaction():
374 logger.critical(
'Somehow ended up with a non-Task '
375 'in the array of tasks: %s'
380 if not product.available:
381 logger.info(
'%s: not available'%(product.did,))
386 logger.warning(
'%s: no file at %s'%(
390 logger.info(
'%s: already at %s'%(product.did,loc))
392 logger.warning(
'%s: no location, but set to available'
394 product.available=
False
398 """!Calls the Product.call_callback function for all completed
399 and delivered products."""
401 logger.warning(
'Calling all callbacks for products that have '
402 'already been completed.')
408 if product.available:
411 if product.has_callbacks():
413 logger.info(
'%s: Calling all callbacks for this '
414 'product. Product location is %s'
415 %(product.did, product.location))
416 product.call_callbacks()
418 logger.info(
'%s: Already completed, but has no '
419 'callbacks. Product location is %s'
420 %(product.did, product.location))
422 logger.info(
'%s: Product has no location. I will not '
423 'call its callbacks.'%(product.did,))
424 logger.warning(
'Done calling callbacks for already completed '
426 logger.warning(
'Called %d callbacks out of %d completed products, '
427 'of %d total products for this job.'
428 %(ncalled,ncomplete,ntotal))
431 """!Deletes all output products, and marks this task and all
432 subtasks as incomplete."""
433 with self.dstore.transaction():
435 assert(product.location !=
'')
437 assert(product.location !=
'')
443 def run_helper(self,one=False,worker=None,raiseall=False,now=False):
444 """!Internal function that underlies run() and runpart()
446 Called from run() and runpart() to perform the actual work.
447 @param one True for runpart(), False for run(). If True, exits
448 after one product is processed.
449 @param worker obsolete and ignored
450 @param raiseall If true, nearly all exceptions are raised.
451 @param now If true, the function will not sleep or wait."""
456 waitsleep=self.
confint(
'waitsleep',10)
458 readysleep=self.
confint(
'readysleep',2)
465 fail_counts=collections.defaultdict(
lambda: 0)
468 for rtime
in self._regribstore.times():
469 strtime=rtime.strftime(
'%Y%m%d.%H%M%S')
470 logger.info(
'%s: will process time %s'%(self.
taskname,strtime))
473 first_time_through=
True
477 attempted_something=
True
486 attempted_something=
False
489 'Gave up: more than 100 regribbing operations failed.')
491 if not first_time_through:
492 sleeptime = waitsleep
if notready
else readysleep
493 loggify = logger.warning
if notready
else logger.info
494 loggify(
'sleep %d...'%(sleeptime,))
495 time.sleep(sleeptime)
496 loggify(
'done sleeping.')
497 first_time_through=
False
500 with self.dstore.transaction():
501 for rtime
in self._regribstore.times():
503 if not subtask.is_completed():
505 if not subtask.is_completed():
506 nincomplete.append( (subtask,rtime) )
507 if len(nincomplete)>=first_n:
break
508 logger.warning(
'Considering these times: %s'%
509 ', '.join([ t.strftime(
'%Y%m%d-%H%M%S') \
510 for (s,t)
in nincomplete ]))
511 if len(nincomplete)<=0:
512 logger.warning(
'No subtasks incomplete. I think I am '
513 'done running. Will exit regribber now.')
515 for (subtask,rtime)
in nincomplete:
516 strtime=rtime.strftime(
'%Y%m%d.%H%M%S')
517 logger.info(
'%s: examine this time...'%(strtime,))
518 if subtask.is_completed():
519 logger.info(
'%s: already done.'%(strtime,))
521 regribber=self._rm.copy()
525 for name
in self._rm.names():
526 failkey=(name,strtime)
527 if failkey
in fail_counts:
528 if fail_counts[failkey]>=3:
529 logger.debug(
'%s %s: skip: failed too many times.'
533 logger.info(
'%s %s: fail count is %d'
534 %(strtime,name,fail_counts[failkey]))
536 logger.info(
'%s: consider product %s'%(strtime,name))
538 logger.info(
'%s: no %s product at this time.'
542 if self.
get_data(name,rtime)
is not None:
543 logger.info(
'%s %s: already done'%(name,strtime))
545 if not regribber.is_ready(name,time=rtime,task=self):
546 msg=
'%s: %s: not ready yet.'%(strtime,name)
553 logger.info(
'%s %s: will need lock'
555 lockfile=os.path.join(lockdir,
'%s.%s'
558 filename=lockfile,logger=logger,max_tries=1,
561 (product,locker)=(
None,dummy)
563 attempted_something=
True
565 if self.
get_data(name,rtime,update=
True) \
567 logger.info(
'%s %s: already done'
570 with
TempDir(prefix=
'%s/%s.%s.'
573 keep_on_error=
False):
574 if regribber.is_grid(name):
576 '%s: %s: calculate this grid.'
580 '%s: %s: process this grib1/2 '
581 'product.'%(strtime,name))
582 if regribber.is_ready(name,time=rtime,
584 regribber.make(name,time=rtime,task=self)
585 if self.
get_data(name,rtime)
is None:
586 msg=
'%s %s: somehow regribber.make '\
587 'did not deliver anything'%(name,strtime)
592 logger.info(
'%s %s: internal error: product does '
593 'not exist at this time.'%(name,strtime))
596 logger.info(
'%s %s: lock held, move on.'
599 except Exception
as e:
600 logger.warning(
'%s %s: failed with exception %s'
601 %(name,strtime,str(e)),exc_info=
True)
604 fail_counts[failkey] = fail_counts[failkey]+1
605 if fail_counts[failkey]>=3:
606 logger.error(
'%s %s: failed too many times'
610 logger.info(
'%s: done.'%(strtime,))
611 subtask.state=COMPLETED
614 logger.info(
'%s: not done.'%(strtime,))
615 logger.debug(
'%s: on to next time....'%(strtime,))
618 """!Performs all regribbing, returning when complete.
620 Runs all regribbing. Does not return until all regribbing
621 is complete, or a fatal error happens. It is safe to run this
622 in multiple threads at the same time. Through file locking
623 and database usage, the jobs will work together.
624 @param kwargs keyword arguments passed to run_helper()"""
628 """!Performs a small amount of work and returns.
629 @param kwargs keyword arguments passed to run_helper()"""
def products
Iterates over products:
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
This exception is raised when a LockFile cannot lock a file because another process or thread has loc...
This class represents a GRIB2 file that is produced by this workflow.
def fcst_hr_min(time, start)
Return forecast time in hours and minutes.
end
the end time of the simulation
taskname
Read-only property: the name of this task.
def run(self, kwargs)
Performs all regribbing, returning when complete.
The base class of tasks run by the HWRF system.
start
the start time of the simulation
Raised upon errors that would cause a retry, in the PostOneWRF.run when passed the raiseall=True argu...
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
def nonedict()
Creates a collections.defaultdict that has a default value of None for any key that has no value...
Represents a process or actor that makes a Product.
def make_time_array_list(self)
Create an array of times to be processed.
def runpart(self, kwargs)
Performs a small amount of work and returns.
Defines StormInfo and related functions for interacting with vitals ATCF data.
A shell-like syntax for running serial, MPI and OpenMP programs.
Base class of tasks run by HWRF.
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
A piece of data produced by a Task.
def get
Alias for self.meta() Returns the value of the specified metadata key or returns default if it is uns...
def unrun(self)
Deletes all output products, and marks this task and all subtasks as incomplete.
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Stores products and tasks in an sqlite3 database file.
This subclass of TempDir takes a directory name, instead of generating one automatically.
def makedirs
Make a directory tree, working around filesystem bugs.
Time manipulation and other numerical routines.
This class is intended to be used with the Python "with TempDir() as t" syntax.
def call_completed_callbacks(self)
Calls the Product.call_callback function for all completed and delivered products.
An hwrf.hwrftask.HWRFTask that performs regribbing operations.
workdir
The directory in which this task should be run.
def confint
Alias for self.conf.getint for section self.section.
def __init__(self, dstore, conf, section, regribmany, start, end, step, atime=None, kwargs)
An hwrf.hwrftask.HWRFTask that performs regribbing operations.
def log
Obtain a logging domain.
parses UNIX conf files and makes the result readily available
This class represents a GRIB1 file produced by this workflow.
def make_products(self)
Creates Product objects.
def conftimestrinterp(self, string, ftime, atime=None, section=None, kwargs)
Alias for self.timestr for backward comaptibility.
A time-indexed array that can only handle equally spaced times.
def uncomplete(self)
Marks all tasks as unstarted and products as unavailable.
Raised when an operation that produces input to Regrib should have produced a Product, but produced nothing at all.
atime
the "atime" argument to send to Product.product() functions
A Python "with" block construct that does nothing.
def __enter__(self)
Do nothing at entry to a "with" block.
def _make_product
implementation of make_products
Automates locking of a lockfile.
Runs the Unified Post Processor on outputs from the WRF-NMM, producing E grid GRIB files as EGRIB1Pro...
timestep
the frequency at which GRIB files should be created
Exceptions raised by the hwrf package.
def is_completed(self)
Is this task complete?
def nonemaker()
Returns None.
def iteritems(self)
Iterates over all metadata (key,value) pairs for this Datum, including "available" and "location"...
Describes regribbing operations using an algebraic structure.
Exceptions for hwrf.regrib.GRIBTask for certain internal errors.
def make_time_array(self)
Create a time-indexed array of None.
def __exit__(self, etype, evalue, traceback)
Do nothing upon exiting a "with" block.
def set_data(self, name, data, time=None, logger=None, kwargs)
Stores intermediate results.
def run_helper
Internal function that underlies run() and runpart()
def get_data(self, name, time=None, update=False, kwargs)
Returns intermediate results.