HWRF  trunk@4391
gribtask.py
1 """!Declares GRIBTask, which automates regribbing operations.
2 
3 This module contains the GRIBTask, which is an HWRFTask that runs the
4 regribbing described in an hwrf.regrib.RegribMany object."""
5 
6 ##@var __all__
7 # The list of symbols exported by "from hwrf.gribtask import *"
8 __all__=['GRIBTask']
9 
10 import os, os.path, time, socket, re, collections
11 
16 
17 from hwrf.regrib import GRIBBase, GRIBOp, RegribMany, GRIB2Op
18 from hwrf.numerics import to_datetime
19 from hwrf.exceptions import GribberError
20 from produtil.cd import TempDir, NamedDir
21 from produtil.datastore import Product, COMPLETED, UNSTARTED
22 
23 from produtil.run import run,exe
24 
25 def nonemaker():
26  """!Returns None."""
27  return None
28 def nonedict():
29  """!Creates a collections.defaultdict that has a default value of
30  None for any key that has no value."""
31  return collections.defaultdict(nonemaker)
32 
33 class LockDummy(object):
34  """!A Python "with" block construct that does nothing."""
35  def __enter__(self):
36  """!Do nothing at entry to a "with" block."""
37  def __exit__(self,etype,evalue,traceback):
38  """!Do nothing upon exiting a "with" block.
39  @param etype,evalue,traceback exception information to ignore."""
40 
41 ##@var SHOULD_BE_MADE
42 # A constant used internally in GRIBTask to indicate a product should
43 # be made.
44 SHOULD_BE_MADE=object()
45 
47  """!An hwrf.hwrftask.HWRFTask that performs regribbing operations.
48 
49  An HWRFTask that runs regribbing for a list of input/output times.
50  This class keeps track of many different grids and grib files,
51  each of which is given a name. The regribbing operations are
52  specified in an hwrf.regrib.RegribMany object. It is possible for
53  multiple jobs to run the same GRIBTask at the same time: the task
54  uses lock files and a produtil.datastore.Datastore to communicate
55  between them."""
56  def __init__(self,dstore,conf,section,regribmany,start,end,step,
57  atime=None,**kwargs):
58  """!Creates a new GRIBTask:
59  @param dstore the produtil.datastore.Datastore to use
60  @param conf the HWRFConfig to use for configuration options.
61  This conf is passed down to the RegribMany during
62  regribbing operations.
63  @param section the config section to use.
64  @param regribmany the hwrf.regrib.RegribMany that describes
65  the regribbing operations
66  @param start,end,step the start and end times and timestep for
67  the list of times to regrib.
68  @param atime the analysis time of the simulation. This is needed
69  by some of the regribbing operations. Note that this
70  analysis time may differ from the conf.cycle, especially for
71  data assimilation first guess jobs.
72  @param kwargs passed to the HWRFTask constructor"""
73 
74  assert(regribmany.has_deliveries())
75  super(GRIBTask,self).__init__(dstore,conf,section,**kwargs)
76  self._regribstore=hwrf.numerics.TimeArray(start,end,step,nonedict)
77  self.start=self._regribstore.start
78  self.end=self._regribstore.end
79  self.timestep=self._regribstore.timestep
80  n = len(self._regribstore)
81  self._deliveries=collections.defaultdict(self.make_time_array_list)
82  self._all_products=set()
83  self._regribnames=list()
84  self._subtasks=hwrf.numerics.TimeArray(start,end,step)
85  logger=self.log()
86  for time in self._subtasks.times():
87  (ihr,imin) = hwrf.numerics.fcst_hr_min(time,self.start)
88  taskname='%s.f%02dh%02dm' % ( self.taskname, ihr, imin )
89  self._subtasks[time] = produtil.datastore.Task(self.dstore,
90  taskname=taskname,logger=logger)
91 
92  if atime is not None: self.atime=self.start
93  self._rm=regribmany
94  self.make_products()
95  regribmany.logger=self.log()
96  self.workerdesc=None
97 
98  ##@var start
99  # the start time of the simulation
100 
101  ##@var end
102  # the end time of the simulation
103 
104  ##@var timestep
105  # the frequency at which GRIB files should be created
106 
107  ##@var workerdesc
108  # None (unused)
109 
110  ##@var atime
111  # the "atime" argument to send to Product.product() functions
112 
113  def make_time_array(self):
114  """!Create a time-indexed array of None.
115 
116  Creates a new hwrf.numerics.TimeArray that maps from list of
117  times to be processed to None."""
118  return hwrf.numerics.TimeArray(self.start,self.end,self.timestep)
120  """!Create an array of times to be processed.
121 
122  Returns a new hwrf.numerics.TimeArray for the list of times
123  to be processed. Unlike make_time_array, this TimeArray will
124  assume any times that don't have data assigned have an empty
125  list()."""
126  return hwrf.numerics.TimeArray(self.start,self.end,self.timestep,
127  list)
128  def get_data(self,name,time=None,update=False,**kwargs):
129  """!Returns intermediate results.
130 
131  Used by RegribMany to retrieve intermediate results. The
132  extra update flag is not used by RegribMany, but rather used
133  by GRIBTask.run() to force a recheck of the database to see if
134  another GRIBTask has completed a regribbing operation by
135  sending update=True.
136  @param name the name of the result type
137  @param time the time at which the result is requested
138  @param update if True, call update() on the result
139  @param kwargs unused"""
140  assert(time is not None)
141  if time is None: return None
142  if name not in self._regribstore[time]:
143  return None
144  got=self._regribstore[time][name]
145  if got is SHOULD_BE_MADE:
146  return None
147  if isinstance(got,Product):
148  av=got.available
149  if not av and update:
150  got.update()
151  av=got.available
152  if not av: return None
153  strtime=time.strftime('%Y%m%d.%H%M%S')
154  self.log().info('recalling result - %s %s'%(strtime,name))
155  return got
156  def set_data(self,name,data,time=None,logger=None,**kwargs):
157  """!Stores intermediate results.
158 
159  Used by RegribMany to store intermediate results. Will also
160  deliver any output products.
161  @param name the name of the result being set
162  @param data the data, any implementation-defined object. If
163  this is a product that is delivered, it is sent to the
164  frominfo= argument of Product.deliver()
165  @param time the time at which the result is set
166  @param logger a logging.Logger to use for logging messages
167  @param kwargs ignored"""
168  assert(time is not None)
169  if logger is None: logger=self.log()
170  strtime=time.strftime('%Y%m%d.%H%M%S')
171  if time is None: return
172  logger.info('result obtained - %s %s'%(strtime,name))
173  assert(name in self._regribstore[time])
174  mydata=self._regribstore[time][name]
175  if isinstance(mydata,produtil.datastore.Product):
176  keep = name in self._deliveries
177  logger.warning(
178  'result obtained - %s %s: deliver (keep=%s) '
179  'destination = %s frominfo = %s'
180  %(name,strtime,repr(keep),repr(mydata.location),repr(data)))
181  mydata.deliver(frominfo=data,keep=keep)
182  if name in self._deliveries:
183  products=self._deliveries[name][time]
184  for product in products:
185  if product!=mydata:
186  logger.warning('%s %s: also deliver to %s'%
187  (name,strtime,product.location))
188  product.deliver(frominfo=data)
189  else:
190  logger.info('result obtained - %s %s: non-product '
191  'result.'%(strtime,name))
192  self._regribstore[time][name]=data
193 
194  # --------------------------------------------------------------------------
195 
196  def make_products(self):
197  """!Creates Product objects.
198 
199  Called automatically from run(), this creates Product objects
200  for all output products. Note that this is potentially an
201  expensive function if the gribtask has a large number of
202  products."""
203  category=self.taskname
204  deliveries=collections.defaultdict(list)
205  grib2s=set()
206  grib1s=set()
207  alls=set()
208  nongrib=set()
209  logger=self.log()
210  # Divide up into GRIB1 and GRIB2 products:
211  for name,op in self._rm.GRIBOps():
212  alls.add(name)
213  if isinstance(op,GRIB2Op):
214  grib2s.add(name)
215  else: # assume anything that isn't GRIB2 but is a GRIBOp
216  # is a GRIB1
217  grib1s.add(name)
218 
219  for name,op in self._rm.nonGRIBOps():
220  nongrib.add(name)
221 
222  # Get the list of deliveries to make:
223  regribset=set()
224  for (where,name,loc,cat,prod,keep) in self._rm.deliveries():
225  deliveries[name].append([where,loc,cat,prod,keep])
226  if name not in regribset:
227  regribset.add(name)
228  self._regribnames.append(name)
229 
230  # Now loop over all output times and make Product objects:
231  taskname=self.taskname
232  make_product=self._make_product
233  for time in self._regribstore.times():
234  fcststr=time.strftime('%Y%m%d.%H%M%S')
235  for name in nongrib:
236  if self._rm.input_valid(name,time=time,task=self):
237  logger.debug('%s: make non-file %s at this time'
238  %(fcststr,name))
239  self._regribstore[time][name]=SHOULD_BE_MADE
240  for name in alls:
241  if not self._rm.input_valid(name,time=time,task=self):
242  continue
243  logger.debug('%s: make file %s at this time'%(fcststr,name))
244  if name in grib2s:
246  else:
248  dlist=None
249  if name in deliveries:
250  dlist=deliveries[name]
251  if dlist is None or len(dlist)!=1:
252  # Zero deliveries or multiple deliveries so we
253  # need an intermediate file.
254  product=make_product(ptype,name,fcststr,time,
255  cat=taskname)
256  self._all_products.add(product)
257  self._regribstore[time][name]=product
258  if dlist is not None:
259  for where,loc,cat,prod,keep in dlist:
260  product=make_product(ptype,name,fcststr,time,
261  where,loc,cat,prod)
262  self._all_products.add(product)
263  self._deliveries[name][time].append(product)
264  else: # one delivery, so use that product as the
265  # target location
266  (where,loc,cat,prod,keep)=dlist[0]
267  product=make_product(ptype,name,fcststr,time,
268  where=where,loc=loc,cat=cat,prod=prod)
269  self._all_products.add(product)
270  self._regribstore[time][name]=product
271  for name in self._regribstore[time].iterkeys():
272  logger.debug('%s: final list contains: %s'%(fcststr,name))
273 
274  def products(self,name=None,time=None,yieldtime=False):
275  """!Iterates over products:
276 
277  Loops over all products that meet the specifications, yielding
278  each one in turn.
279  @param name the name of the products (same name as in the
280  RegribMany).
281  @param time the forecast time of the product
282  @param yieldtime if True, instead of iterating over products, this
283  will iterate over tuples containing the time and product."""
284  if time is not None:
285  if yieldtime or name is not None:
286  neartime=self._regribstore.neartime(time)
287  if name is not None:
288  if name not in self._regribstore[neartime]: return
289  found=self._regribstore[neartime][name]
290  if found is None: return
291  if yieldtime:
292  yield neartime,found
293  else:
294  yield found
295  else:
296  for (name,product) in self._regribstore[time].iteritems():
297  if isinstance(product,produtil.datastore.Product):
298  if yieldtime:
299  yield neartime,product
300  else:
301  yield product
302  else:
303  for rtime in self._regribstore.datatimes():
304  if name is not None:
305  if name not in self._regribstore[rtime]: return
306  product=self._regribstore[rtime].get(name,None)
307  if isinstance(product,produtil.datastore.Product):
308  if yieldtime:
309  yield rtime,product
310  else:
311  yield product
312  else:
313  for (name,product) in self._regribstore[rtime].iteritems():
314  if isinstance(product,produtil.datastore.Product):
315  if yieldtime:
316  yield rtime,product
317  else:
318  yield product
319 
320  def _make_product(self,cls,name,fcststr,time,where=None,loc=None,
321  cat=None,prod=None):
322  """!implementation of make_products
323 
324  The underlying implementation of make_products, this makes
325  one product.
326  @param cls the class to create. This is a constructor which will
327  be called like so:
328  @code
329  cls(self.dstore,category=cat,prodname=prod,location=loc)
330  @endcode
331 
332  @param name the name of the product from the RegribMany
333  @param fcststr a string representation of the forecast time, used
334  for logging and product location generation
335  @param time the forecast time
336  @param where "com" or "intercom". Default: "intercom"
337  @param loc the product location. A reasonable default is chosen
338  if this is unspecified
339  @param cat the product category. Default: self.taskname
340  @param prod the product's prodname. Default: name."""
341  if where is None: where='intercom'
342  if cat is None:
343  cat=self.taskname
344  elif '{' in cat:
345  cat=self.conftimestrinterp(cat,time)
346  if loc is None:
347  assert(where=='intercom') # only auto-gen locations in temporary areas?
348  loc='%s/%s.%s'%(self.taskname,name,fcststr)
349  elif '{' in loc:
350  loc=self.conftimestrinterp(loc,time)
351  if prod is None:
352  prod='%s/%s'%(where,loc)
353  elif '{' in prod:
354  prod=self.conftimestrinterp(prod,time)
355  loc=os.path.join(self.conf.getdir(where),loc)
356  assert(loc is not None and loc!='')
357  assert(cat is not None and cat!='')
358  assert(prod is not None and prod!='')
359  return cls(self.dstore,category=cat,prodname=prod,location=loc)
360 
361  # --------------------------------------------------------------------------
362 
363  def uncomplete(self):
364  """!Marks all tasks as unstarted and products as unavailable.
365 
366  Marks this task and all subtasks as incomplete, ensuring that
367  the next call to run or runpart will attempt to produce any
368  products that are not delivered. All products that are
369  "available" but are not on disk are marked as unavailable."""
370  logger=self.log()
371  with self.dstore.transaction():
372  for task in self._subtasks:
373  if not isinstance(task,produtil.datastore.Task):
374  logger.critical('Somehow ended up with a non-Task '
375  'in the array of tasks: %s'
376  %(repr(task),))
377  assert(isinstance(task,produtil.datastore.Task))
378  task.state=UNSTARTED
379  for product in self._all_products:
380  if not product.available:
381  logger.info('%s: not available'%(product.did,))
382  continue
383  loc=product.location
384  if loc!='':
385  if not produtil.fileop.isnonempty(loc):
386  logger.warning('%s: no file at %s'%(
387  product.did,loc))
388  product.undeliver()
389  else:
390  logger.info('%s: already at %s'%(product.did,loc))
391  else:
392  logger.warning('%s: no location, but set to available'
393  %(product.did,))
394  product.available=False
395  self.state=UNSTARTED
396 
398  """!Calls the Product.call_callback function for all completed
399  and delivered products."""
400  logger=self.log()
401  logger.warning('Calling all callbacks for products that have '
402  'already been completed.')
403  ncomplete=0
404  ntotal=0
405  ncalled=0
406  for product in self._all_products:
407  ntotal+=1
408  if product.available:
409  ncomplete+=1
410  if product.location:
411  if product.has_callbacks():
412 
413  logger.info('%s: Calling all callbacks for this '
414  'product. Product location is %s'
415  %(product.did, product.location))
416  product.call_callbacks()
417  else:
418  logger.info('%s: Already completed, but has no '
419  'callbacks. Product location is %s'
420  %(product.did, product.location))
421  else:
422  logger.info('%s: Product has no location. I will not '
423  'call its callbacks.'%(product.did,))
424  logger.warning('Done calling callbacks for already completed '
425  'products.')
426  logger.warning('Called %d callbacks out of %d completed products, '
427  'of %d total products for this job.'
428  %(ncalled,ncomplete,ntotal))
429 
430  def unrun(self):
431  """!Deletes all output products, and marks this task and all
432  subtasks as incomplete."""
433  with self.dstore.transaction():
434  for product in self._all_products:
435  assert(product.location != '')
436  product.undeliver()
437  assert(product.location != '')
438 # produtil.datastore.force_unlock(product)
439  for task in self._subtasks:
440  task.unrun()
441  self.state=UNSTARTED
442 
443  def run_helper(self,one=False,worker=None,raiseall=False,now=False):
444  """!Internal function that underlies run() and runpart()
445 
446  Called from run() and runpart() to perform the actual work.
447  @param one True for runpart(), False for run(). If True, exits
448  after one product is processed.
449  @param worker obsolete and ignored
450  @param raiseall If true, nearly all exceptions are raised.
451  @param now If true, the function will not sleep or wait."""
452  if self.is_completed():
453  return
454  logger=self.log()
455  # waitsleep: sleep time if some inputs were not ready
456  waitsleep=self.confint('waitsleep',10)
457  # readysleep: sleep time if all products were ready
458  readysleep=self.confint('readysleep',2)
459 
460  produtil.fileop.makedirs(os.path.join(self.getdir('intercom'),self.taskname))
461  lockdir=os.path.join(self.getdir('lockdir'),self.taskname)
462  produtil.fileop.makedirs(lockdir,logger=logger)
463  produtil.fileop.makedirs(self.workdir,logger=logger)
464 
465  fail_counts=collections.defaultdict(lambda: 0)
466  all_fails=0
467 
468  for rtime in self._regribstore.times():
469  strtime=rtime.strftime('%Y%m%d.%H%M%S')
470  logger.info('%s: will process time %s'%(self.taskname,strtime))
471 
472  first_n=8
473  first_time_through=True
474  notready=True
475 
476  dummy=LockDummy()
477  attempted_something=True
478  while True:
479  # I have no idea what this was supposed to do, but
480  # it does not do what it claims to do and it
481  # causes random failures of the products job:
482  #
483  # if not attempted_something and not notready:
484  # raise hwrf.exceptions.PostFailed(
485  # 'Gave up: too many remaining tasks failed.')
486  attempted_something=False
487  if all_fails>100:
489  'Gave up: more than 100 regribbing operations failed.')
490 
491  if not first_time_through:
492  sleeptime = waitsleep if notready else readysleep
493  loggify = logger.warning if notready else logger.info
494  loggify('sleep %d...'%(sleeptime,))
495  time.sleep(sleeptime)
496  loggify('done sleeping.')
497  first_time_through=False
498  nincomplete=list()
499  # Find the first five incomplete tasks:
500  with self.dstore.transaction():
501  for rtime in self._regribstore.times():
502  subtask=self._subtasks[rtime]
503  if not subtask.is_completed():
504  subtask.update()
505  if not subtask.is_completed():
506  nincomplete.append( (subtask,rtime) )
507  if len(nincomplete)>=first_n: break
508  logger.warning('Considering these times: %s'%
509  ', '.join([ t.strftime('%Y%m%d-%H%M%S') \
510  for (s,t) in nincomplete ]))
511  if len(nincomplete)<=0:
512  logger.warning('No subtasks incomplete. I think I am '
513  'done running. Will exit regribber now.')
514  break
515  for (subtask,rtime) in nincomplete:
516  strtime=rtime.strftime('%Y%m%d.%H%M%S')
517  logger.info('%s: examine this time...'%(strtime,))
518  if subtask.is_completed():
519  logger.info('%s: already done.'%(strtime,))
520  continue
521  regribber=self._rm.copy()
522  regribber._data=self
523  thisdone=True
524  notready=False
525  for name in self._rm.names():
526  failkey=(name,strtime)
527  if failkey in fail_counts:
528  if fail_counts[failkey]>=3:
529  logger.debug('%s %s: skip: failed too many times.'
530  %(strtime,name))
531  continue
532  else:
533  logger.info('%s %s: fail count is %d'
534  %(strtime,name,fail_counts[failkey]))
535  rst=self._regribstore[rtime]
536  logger.info('%s: consider product %s'%(strtime,name))
537  if name not in rst:
538  logger.info('%s: no %s product at this time.'
539  %(strtime,name))
540  continue
541  try:
542  if self.get_data(name,rtime) is not None:
543  logger.info('%s %s: already done'%(name,strtime))
544  continue
545  if not regribber.is_ready(name,time=rtime,task=self):
546  msg='%s: %s: not ready yet.'%(strtime,name)
547  logger.info(msg)
548  thisdone=False
549  notready=True
550  if now and raiseall: raise GribberError(msg)
551  continue
552  if name in self._regribnames:
553  logger.info('%s %s: will need lock'
554  %(name,strtime))
555  lockfile=os.path.join(lockdir,'%s.%s'
556  %(strtime,name))
558  filename=lockfile,logger=logger,max_tries=1,
559  giveup_quiet=True)
560  else:
561  (product,locker)=(None,dummy)
562 
563  attempted_something=True
564  with locker:
565  if self.get_data(name,rtime,update=True) \
566  is not None:
567  logger.info('%s %s: already done'
568  %(name,strtime))
569  continue
570  with TempDir(prefix='%s/%s.%s.'
571  %(self.taskname,strtime,name),
572  dir=self.workdir,keep=False,
573  keep_on_error=False):
574  if regribber.is_grid(name):
575  logger.info(
576  '%s: %s: calculate this grid.'
577  %(strtime,name))
578  else:
579  logger.warning(
580  '%s: %s: process this grib1/2 '
581  'product.'%(strtime,name))
582  if regribber.is_ready(name,time=rtime,
583  task=self):
584  regribber.make(name,time=rtime,task=self)
585  if self.get_data(name,rtime) is None:
586  msg='%s %s: somehow regribber.make '\
587  'did not deliver anything'%(name,strtime)
588  logger.warning(msg)
589  thisdone=False
590  if raiseall: raise GribberError(msg)
591  except hwrf.exceptions.NoProductError as npe:
592  logger.info('%s %s: internal error: product does '
593  'not exist at this time.'%(name,strtime))
594  raise
595  except produtil.locking.LockHeld as lh:
596  logger.info('%s %s: lock held, move on.'
597  %(name,strtime))
598  thisdone=False
599  except Exception as e:
600  logger.warning('%s %s: failed with exception %s'
601  %(name,strtime,str(e)),exc_info=True)
602  if raiseall: raise
603  all_fails+=1
604  fail_counts[failkey] = fail_counts[failkey]+1
605  if fail_counts[failkey]>=3:
606  logger.error('%s %s: failed too many times'
607  %(strtime,name))
608  thisdone=False
609  if thisdone:
610  logger.info('%s: done.'%(strtime,))
611  subtask.state=COMPLETED
612  if one: return
613  else:
614  logger.info('%s: not done.'%(strtime,))
615  logger.debug('%s: on to next time....'%(strtime,))
616  self.state=COMPLETED
617  def run(self,**kwargs):
618  """!Performs all regribbing, returning when complete.
619 
620  Runs all regribbing. Does not return until all regribbing
621  is complete, or a fatal error happens. It is safe to run this
622  in multiple threads at the same time. Through file locking
623  and database usage, the jobs will work together.
624  @param kwargs keyword arguments passed to run_helper()"""
625  with NamedDir(self.workdir):
626  self.run_helper(False,**kwargs)
627  def runpart(self,**kwargs):
628  """!Performs a small amount of work and returns.
629  @param kwargs keyword arguments passed to run_helper()"""
630  with NamedDir(self.workdir):
631  self.run_helper(True,**kwargs)
632 
def products
Iterates over products:
Definition: gribtask.py:274
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
This exception is raised when a LockFile cannot lock a file because another process or thread has loc...
Definition: locking.py:55
This class represents a GRIB2 file that is produced by this workflow.
Definition: regrib.py:1963
def fcst_hr_min(time, start)
Return forecast time in hours and minutes.
Definition: numerics.py:126
end
the end time of the simulation
Definition: gribtask.py:78
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
def run(self, kwargs)
Performs all regribbing, returning when complete.
Definition: gribtask.py:617
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
start
the start time of the simulation
Definition: gribtask.py:77
Raised upon errors that would cause a retry, in the PostOneWRF.run when passed the raiseall=True argu...
Definition: exceptions.py:416
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
def nonedict()
Creates a collections.defaultdict that has a default value of None for any key that has no value...
Definition: gribtask.py:28
Represents a process or actor that makes a Product.
Definition: datastore.py:1052
def make_time_array_list(self)
Create an array of times to be processed.
Definition: gribtask.py:119
def runpart(self, kwargs)
Performs a small amount of work and returns.
Definition: gribtask.py:627
Defines StormInfo and related functions for interacting with vitals ATCF data.
Definition: storminfo.py:1
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
Definition: hwrftask.py:396
A piece of data produced by a Task.
Definition: datastore.py:716
def get
Alias for self.meta() Returns the value of the specified metadata key or returns default if it is uns...
Definition: datastore.py:637
def unrun(self)
Deletes all output products, and marks this task and all subtasks as incomplete.
Definition: gribtask.py:430
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Definition: fileop.py:333
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
def makedirs
Make a directory tree, working around filesystem bugs.
Definition: fileop.py:224
Time manipulation and other numerical routines.
Definition: numerics.py:1
This class is intended to be used with the Python "with TempDir() as t" syntax.
Definition: cd.py:38
def call_completed_callbacks(self)
Calls the Product.call_callback function for all completed and delivered products.
Definition: gribtask.py:397
An hwrf.hwrftask.HWRFTask that performs regribbing operations.
Definition: gribtask.py:46
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def confint
Alias for self.conf.getint for section self.section.
Definition: hwrftask.py:248
def __init__(self, dstore, conf, section, regribmany, start, end, step, atime=None, kwargs)
An hwrf.hwrftask.HWRFTask that performs regribbing operations.
Definition: gribtask.py:57
def log
Obtain a logging domain.
Definition: hwrftask.py:425
parses UNIX conf files and makes the result readily available
Definition: config.py:1
This class represents a GRIB1 file produced by this workflow.
Definition: regrib.py:2132
def make_products(self)
Creates Product objects.
Definition: gribtask.py:196
def conftimestrinterp(self, string, ftime, atime=None, section=None, kwargs)
Alias for self.timestr for backward comaptibility.
Definition: hwrftask.py:328
workerdesc
None (unused)
Definition: gribtask.py:96
A time-indexed array that can only handle equally spaced times.
Definition: numerics.py:689
def uncomplete(self)
Marks all tasks as unstarted and products as unavailable.
Definition: gribtask.py:363
Raised when an operation that produces input to Regrib should have produced a Product, but produced nothing at all.
Definition: exceptions.py:268
atime
the "atime" argument to send to Product.product() functions
Definition: gribtask.py:92
A Python "with" block construct that does nothing.
Definition: gribtask.py:33
def __enter__(self)
Do nothing at entry to a "with" block.
Definition: gribtask.py:35
def _make_product
implementation of make_products
Definition: gribtask.py:321
Automates locking of a lockfile.
Definition: locking.py:66
Runs the Unified Post Processor on outputs from the WRF-NMM, producing E grid GRIB files as EGRIB1Pro...
Definition: post.py:1
timestep
the frequency at which GRIB files should be created
Definition: gribtask.py:79
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def is_completed(self)
Is this task complete?
Definition: datastore.py:1180
def nonemaker()
Returns None.
Definition: gribtask.py:25
def iteritems(self)
Iterates over all metadata (key,value) pairs for this Datum, including "available" and "location"...
Definition: datastore.py:677
Describes regribbing operations using an algebraic structure.
Definition: regrib.py:1
Exceptions for hwrf.regrib.GRIBTask for certain internal errors.
Definition: exceptions.py:292
def make_time_array(self)
Create a time-indexed array of None.
Definition: gribtask.py:113
def __exit__(self, etype, evalue, traceback)
Do nothing upon exiting a "with" block.
Definition: gribtask.py:37
def set_data(self, name, data, time=None, logger=None, kwargs)
Stores intermediate results.
Definition: gribtask.py:156
def run_helper
Internal function that underlies run() and runpart()
Definition: gribtask.py:443
def get_data(self, name, time=None, update=False, kwargs)
Returns intermediate results.
Definition: gribtask.py:128