HWRF  trunk@4391
hwrftask.py
1 """!Base class of tasks run by HWRF.
2 
3 This module contains the HWRFTask class, a subclass of
4 produtil.datastore.Task intended to be the base class of any HWRF
5 task. It provides logging services, easy access to a config file,
6 access to tcvitals, and extends the hwrf.config.HWRFConfig string
7 interpolation to include vitals information. It also provides a
8 standard way of setting and querying the work and output directories
9 of a task, and whether the task should scrub its output."""
10 
11 import re, os
12 import hwrf.numerics
13 import produtil.log
14 from produtil.datastore import Task
15 
16 ##@var __all__
17 # Symbols exported by "from hwrf.hwrftask import *"
18 __all__=['HWRFTask']
19 
20 ##@var UNSPECIFIED
21 # Special constant used to detect unspecified arguments to functions.
22 # This allows None to be sent.
23 UNSPECIFIED=object()
24 
25 class HWRFTask(Task):
26  """!The base class of tasks run by the HWRF system.
27 
28  This class represents a task to be run by the HWRF system. It can
29  be configured by an hwrf.config.HWRFConfig object. Internal state
30  information is stored in an produtil.datastore.Datastore. Each
31  task has its own workdir, outdir and scrub flag, as well as its
32  own vitals information.
33 
34  Execution of a task is done by calling run(), and the products
35  generated by this task can be iterated with products()"""
36  def __init__(self,dstore,conf,section,taskname=None,workdir=None,
37  outdir=None,storminfo=UNSPECIFIED,taskvars=UNSPECIFIED,
38  **kwargs):
39  """!Creates an HWRFTask
40  @param dstore passed to Datum: the Datastore object for this Task
41  @param conf the conf object for this task
42  @param section the conf section for this task
43  @param taskname Optional: the taskname in the datastore.
44  Default: the section name
45  @param workdir directory in which this task should run.
46  Any value set in the database will override this value.
47  @param outdir directory where output should be copied. This
48  argument must not be changed throughout the lifetime of
49  the HWRF datstore database file.
50  @param storminfo the storm vitals information for the storm this
51  task is running.
52  @param taskvars additonal variables for string expansion, sent to
53  the taskvars arguments of hwrf.config.HWRFConfig member
54  functions.
55  @param kwargs passed to the parent class constructor."""
56  if taskname is None:
57  taskname=section
58  conf.register_hwrf_task(taskname)
59  self.__taskvars=dict()
60  if taskvars is not UNSPECIFIED:
61  for k,v in taskvars.iteritems():
62  self.tvset(k,v)
63  self._conf=conf
64  self._section=str(section)
65  self._storminfo=storminfo
66  if taskname is not None and not isinstance(taskname,basestring):
67  raise TypeError('The taskname must be None or a basestring '
68  'subclass')
69  if not isinstance(section,basestring):
70  raise TypeError('The section be a basestring subclass')
71  if workdir is None:
72  workdir=self.confstr('workdir','')
73  if workdir is None or workdir=='':
74  workdir=os.path.join(self.getdir('WORKhwrf'),taskname)
75  if outdir is None:
76  outdir=self.confstr('workdir','')
77  if outdir is None or outdir=='':
78  outdir=os.path.join(self.getdir('intercom'),taskname)
79  with dstore.transaction():
80  super(HWRFTask,self).__init__(dstore,taskname=taskname,
81  logger=conf.log(taskname),**kwargs)
82  mworkdir=self.meta('workdir','')
83  moutdir=self.meta('outdir','')
84  if mworkdir:
85  workdir=mworkdir
86  else:
87  self['workdir']=workdir
88  if moutdir:
89  outdir=moutdir
90  else:
91  self['outdir']=outdir
92  if storminfo is UNSPECIFIED:
93  if hasattr(conf,'syndat'):
94  self.storminfo=conf.syndat
95  elif storminfo is not None:
96  if isinstance(storminfo,basestring):
98  self.storminfo.update(conf.items('config'))
99  self.storminfo.parse_vitals(storminfo)
100  elif isinstance(storminfo,hwrf.storminfo.StormInfo):
101  self.storminfo=storminfo.copy()
102  else:
103  raise TypeError("The storminfo argument to HWRFTask() must "
104  'be None, a string or '
105  'hwrf.storminfo.StormInfo')
106  ##@var storminfo
107  # The hwrf.storminfo.StormInfo describing the vitals information
108  # for the storm processed by this HWRFTask.
109 
110  # Intent is to be absolutely certain this is the fakestorm
111  # and that the fake storm id is uppercase.
112  @property
113  def isfakestorm(self):
114  """Is this the fake storm of a multistorm HWRF run?"""
115  isfakestorm = False
116  if self._conf.getbool('config','run_multistorm',False):
117  fakestormid=self._conf.getstr('config','fakestormid', 'nofakeid')
118  if fakestormid != 'nofakeid':
119  if fakestormid == self.storminfo.stormid3:
120  isfakestorm = True
121  elif fakestormid.upper() == self.storminfo.stormid3.upper():
123  "Multistorm run: isfakestorm property can not be "
124  "set since case mismatch between fakestormid: %s "
125  "and stormid3: %s"%(fakestormid,self.storminfo.stormid3))
126  else:
128  "Multistorm run: isfakestorm property can not be set since No "
129  "'fakestormid' in config section of storm %s."
130  "Check if you defined a list of multistorm ids."
131  %(self.storminfo.stormid3))
132 
133  return isfakestorm
134  @property
135  def ismultistorm(self):
136  """Is this a multistorm HWRF run?"""
137  ismultistorm = False
138  if self._conf.getbool('config','run_multistorm',False):
139  ismultistorm = True
140  return ismultistorm
141  def get_workdir(self):
142  """!Returns the directory the class should work in, as set by
143  the "workdir" metadata value."""
144  workdir=self.meta('workdir','')
145  if not workdir:
146  workdir=os.path.join(self.getdir('WORKhwrf'),self.taskname)
147  assert(workdir!='/')
148  return workdir
149  def set_workdir(self,val):
150  """!Sets the directory the class should work in. This sets the
151  "workdir" metadata value.
152  @param val the new work directory"""
153  self['workdir']=str(val)
154  ##@var workdir
155  # The directory in which this task should be run
156  workdir=property(get_workdir,set_workdir,None,
157  """!The directory in which this task should be run.""")
158 
159  def get_outdir(self):
160  """!Gets the directory that should receive output data. This
161  is in the "outdir" metadata value."""
162  outdir=self.meta('outdir','')
163  if not outdir:
164  outdir=os.path.join(self.getdir('intercom'),self.taskname)
165  assert(outdir!='/')
166  return outdir
167  def set_outdir(self,val):
168  """!Sets the directory that should receive output data. Sets
169  the "outdir" metadata value.
170  @param val the new output directory"""
171  self['outdir']=str(val)
172  ##@var outdir
173  # The directory in which this task should deliver its final output.
174  # Note that changing this will NOT update products already in the
175  # database.
176  outdir=property(get_outdir,set_outdir,None,
177  """!The directory to which this task should deliver its final output.""")
178 
179  @property
180  def realtime(self):
181  """!Is this job a real-time forecast job?
182 
183  Is this job running an actual, real-time forecast for an event
184  happening now? This is different than self.conf.realtime: it
185  allows this job's config section to locally override the
186  value."""
187  return self.confbool('realtime',True)
188 
189  @property
190  def redirect(self):
191  """!Should subprograms' outputs be redirected to separate files?"""
192  return self.confbool('redirect',False)
193 
194  @property
195  def scrub(self):
196  """!Should temporary files be deleted as soon as they are not
197  needed?"""
198  return self.confbool('scrub',True)
199 
200  def tvset(self,opt,val):
201  """!Sets a taskvar option's value.
202 
203  Sets an object-local (taskvar) value for option "opt" to value "val".
204  This will override config settings from the HWRFConfig object.
205  These are sent into the taskvars= parameter to the various
206  HWRFConfig member functions (hence the "tv" in "tvset").
207  @param opt the name of the taskvar
208  @param val the string value of the option"""
209  sopt=str(opt)
210  if sopt[0:2]=='__':
212  '%s: invalid option name. Cannot begin with __'%(sopt,))
213  self.__taskvars[sopt]=val
214 
215  def tvdel(self,opt):
216  """!Deletes an object-local value set by tvset.
217  @param opt the name of the taskvar to delete"""
218  sopt=str(opt)
219  del self.__taskvars[sopt]
220 
221  def tvget(self,opt):
222  """!Gets a taskvar's value
223 
224  Returns the value of an object-local (taskvar) option set by tvset.
225  @param opt the taskvar whose value should be returned"""
226  sopt=str(opt)
227  return self.__taskvars[sopt]
228 
229  def tvhave(self,opt=UNSPECIFIED):
230  """!Is a taskvar set?
231 
232  If an option is specified, determines if the given option has
233  an object-local (taskvar) value. If no option is specified,
234  returns True if ANY object-local values (taskvars) exist for
235  any options.
236  @param opt Optional: the name of the taskvar being checked."""
237  if opt is UNSPECIFIED:
238  return len(self.__taskvars)>0
239  sopt=str(opt)
240  return sopt in self.__taskvars
241 
242  @property
243  def taskvars(self):
244  """!The dict of object-local values used for string substitution."""
245  return self.__taskvars
246 
247  def confint(self,opt,default=None,badtypeok=False,section=None,
248  morevars=None):
249  """!Alias for self.conf.getint for section self.section.
250  @param opt the option name
251  @param section Optional: the section. Default: self.section.
252  @param default if specified and not None, then the default is
253  returned if an option has no value or the section does not exist
254  @param badtypeok is True, and the conversion fails, and a
255  default is specified, the default will be returned.
256  @param morevars dict of more variables for string expansion"""
257  if(section is None): section=self._section
258  return self._conf.getint(section,opt,default,badtypeok,
259  morevars=morevars,taskvars=self.__taskvars)
260  def confstr(self,opt,default=None,badtypeok=False,section=None,
261  morevars=None):
262  """!Alias for self.conf.getstr for section self.section.
263  @param opt the option name
264  @param section Optional: the section. Default: self.section
265  @param default if specified and not None, then the default is
266  returned if an option has no value or the section does not exist
267  @param badtypeok is True, and the conversion fails, and a
268  default is specified, the default will be returned.
269  @param morevars dict of more variables for string expansion"""
270  if(section is None): section=self._section
271  return self._conf.getstr(section,opt,default,badtypeok,
272  morevars=morevars,taskvars=self.__taskvars)
273  def conffloat(self,opt,default=None,badtypeok=False,section=None,
274  morevars=None):
275  """!Alias for self.conf.getfloat for section self.section.
276  @param opt the option name
277  @param section Optional: the section. Default: self.section
278  @param default if specified and not None, then the default is
279  returned if an option has no value or the section does not exist
280  @param badtypeok is True, and the conversion fails, and a
281  default is specified, the default will be returned.
282  @param morevars dict of more variables for string expansion"""
283  if(section is None): section=self._section
284  return self._conf.getfloat(section,opt,default,badtypeok,
285  morevars=morevars,taskvars=self.__taskvars)
286  def confbool(self,opt,default=None,badtypeok=False,section=None,
287  morevars=None):
288  """!Alias for self.conf.getbool for section self.section.
289  @param opt the option name
290  @param section Optional: the section. Default: self.section
291  @param default if specified and not None, then the default is
292  returned if an option has no value or the section does not exist
293  @param badtypeok is True, and the conversion fails, and a
294  default is specified, the default will be returned.
295  @param morevars dict of more variables for string expansion"""
296  if(section is None): section=self._section
297  return self._conf.getbool(section,opt,default,badtypeok,
298  morevars=morevars,taskvars=self.__taskvars)
299  def confget(self,opt,default=None,badtypeok=False,section=None,
300  morevars=None):
301  """!Alias for self.conf.get for section self.section.
302  @param opt the option name
303  @param section Optional: the section. Default: self.section
304  @param default if specified and not None, then the default is
305  returned if an option has no value or the section does not exist
306  @param badtypeok is True, and the conversion fails, and a
307  default is specified, the default will be returned.
308  @param morevars dict of more variables for string expansion"""
309  if(section is None): section=self._section
310  return self._conf.get(section,opt,default,badtypeok,
311  morevars=morevars,taskvars=self.__taskvars)
312  def confitems(self,section=None,morevars=None):
313  """!Alias for self.conf.items for section self.section.
314  @param section Optional: the section. Default: self.section.
315  @param morevars variables for string substitution"""
316  if(section is None): section=self._section
317  return self._conf.items(section,morevars=morevars,taskvars=self.__taskvars)
318 
319  def confstrinterp(self,string,section=None,**kwargs):
320  """!Alias for self.icstr for backward compatibility
321  @param string the string to expand
322  @param section Optional: the section in which to expand it.
323  Default: self.section.
324  @param kwargs: more arguments for string substitution"""
325  return self.icstr(string,section=section,**kwargs)
326 
327  def conftimestrinterp(self,string,ftime,atime=None,section=None,
328  **kwargs):
329  """!Alias for self.timestr for backward comaptibility
330  @param string the string to expand
331  @param ftime: the forecast time
332  @param atime: Optional: the analysis time. Default: self.conf.cycle
333  @param section Optional: the section in which to expand it.
334  Default: self.section.
335  @param kwargs: more arguments for string substitution"""
336  return self.timestr(string,ftime,atime=atime,section=section,
337  taskvars=self.__taskvars,**kwargs)
338  def confraw(self,opt,default=None,section=None):
339  """!Get a raw configuration value before string expansion.
340 
341  Returns the raw, uninterpolated value for the specified
342  option, raising an exception if that option is unset. Will
343  not search other sections, and will not search the taskvars,
344  unlike other conf accessors.
345  @param opt the option of interest
346  @param section Optional: the section. Default: self.section
347  @param default Optional: value to return if nothing is found."""
348  if section is None: section=self.section
349  return self._conf.getraw(section,opt,default)
350 
351  def icstr(self,string,section=None,**kwargs):
352  """!Expands a string in the given conf section.
353 
354  Given a string, expand it as if it was a value in the
355  specified conf section. Makes this objects tcvitals, if any,
356  available via the "vit" variable while interpolating strings.
357  @param string the string to expand
358  @param section Optional: the section in which to expand it.
359  Default: self.section.
360  @param kwargs: more arguments for string substitution"""
361  if(section is None): section=self._section
362  if self.storminfo and 'vit' not in kwargs:
363  kwargs['vit']=self.storminfo.__dict__
364  return self._conf.strinterp(section,string,taskvars=self.__taskvars,
365  **kwargs)
366 
367  def timestr(self,string,ftime,atime=None,section=None,**kwargs):
368  """!Expands a string in the given conf section, including time vars
369 
370  Expands a string in the given conf section (default:
371  self.section), and includes forecast and analysis time
372  (default: conf.cycle) information in the variables that can be
373  expanded. The mandatory ftime argument is the forecast time
374  which will be used to expand values such as fHH, fYMDH, etc.
375  The optional atime will be used to expand aHH, aYMDH, etc.,
376  and the two will be used together for forecast minus analysis
377  fields like fahr. See hwrf.config.timestrinterp for details
378 
379  As with self.icstr, this class's vitals are available via the
380  "vit" variable while interpolating strings.
381  @param string the string to expand
382  @param ftime: the forecast time
383  @param atime: Optional: the analysis time. Default: self.conf.cycle
384  @param section Optional: the section in which to expand it.
385  Default: self.section.
386  @param kwargs: more arguments for string substitution"""
387  if(section is None): section=self._section
388  if self.storminfo and 'vit' not in kwargs:
389  kwargs['vit']=self.storminfo.__dict__
390  if 'taskvars' in kwargs:
391  return self._conf.timestrinterp(section,string,ftime,atime,**kwargs)
392  else:
393  return self._conf.timestrinterp(section,string,ftime,atime,
394  taskvars=self.__taskvars,**kwargs)
395 
396  def getdir(self,opt,default=None,morevars=None):
397  """!Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
398  @param opt the option name
399  @param default Optional: default value if nothing is found.
400  @param morevars Optional: more variables for string substitution"""
401  return self._conf.get('dir',opt,default,morevars=morevars,
402  taskvars=self.__taskvars)
403  def getexe(self,opt,default=None,morevars=None):
404  """!Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
405  @param opt the option name
406  @param default Optional: default value if nothing is found.
407  @param morevars Optional: more variables for string substitution"""
408  return self._conf.get('exe',opt,default,morevars=morevars,
409  taskvars=self.__taskvars)
410  def getconf(self):
411  """!Returns this HWRFTask's hwrf.config.HWRFConfig object."""
412  return self._conf
413  ##@var conf
414  # This HWRFTask's hwrf.config.HWRFConfig object
415  conf=property(getconf,None,None,
416  """!The HWRFConfig for this HWRFTask (read-only)""")
417  def getsection(self):
418  """!Returns this HWRFTask's section name in the HWRFConfig."""
419  return self._section
420  ##@var section
421  # The confsection in self.section for this HWRFTask (read-only)
422  section=property(getsection,None,None,
423  """!The confsection in self.section for this HWRFTask (read-only)""")
424 
425  def log(self,subdom=None):
426  """!Obtain a logging domain.
427 
428  Creates or returns a logging.Logger. If subdom is None or
429  unspecified, returns a cached logger for this task's logging
430  domain. Otherwise, returns a logger for the specified
431  subdomain of this task's logging domain.
432  @param subdom Optional: the desired logging domain"""
433  if subdom is None:
434  return self._logger
435  return self._conf.log(self.taskname+'.'+str(subdom))
436  def inputiter(self):
437  """!Iterates over all inputs required by this task.
438 
439  Iterates over dict-like objects suitable for input to
440  hwrf.input.InputSource.get. Each object contains the
441  following keywords:
442  * dataset: string name of the dataset (gfs, gdas1, gefs,
443  enkf, etc.)
444  * item: string name of the object (ie.: gfs_sf, gfs_sfcanl, bufr)
445  * atime: self.conf.cycle
446  * ftime: only present when relevant: the forecast time, in a
447  format accepted by to_datetime_rel
448  * enkfmem: only present when relevant: the ENKF member ID
449  * obstype: only present when relevant: the bufr data type.
450  Other keywords may be present if needed. They will be passed on
451  by hwrf.input.InputSource for string replacement."""
452  return
453  yield {} # ensures this is an iterator
454 
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
Definition: hwrftask.py:319
Raised when one tries to use an invalid string for an option name.
Definition: exceptions.py:26
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
def meta
Return the value of a metadata key.
Definition: datastore.py:625
def confitems
Alias for self.conf.items for section self.section.
Definition: hwrftask.py:312
def redirect(self)
Should subprograms' outputs be redirected to separate files?
Definition: hwrftask.py:190
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
def tvset(self, opt, val)
Sets a taskvar option's value.
Definition: hwrftask.py:200
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
def getconf(self)
Returns this HWRFTask's hwrf.config.HWRFConfig object.
Definition: hwrftask.py:410
storminfo
The hwrf.storminfo.StormInfo describing the vitals information for the storm processed by this HWRFTa...
Definition: hwrftask.py:94
def get_workdir(self)
Returns the directory the class should work in, as set by the "workdir" metadata value.
Definition: hwrftask.py:141
Represents a process or actor that makes a Product.
Definition: datastore.py:1052
def confbool
Alias for self.conf.getbool for section self.section.
Definition: hwrftask.py:287
section
The confsection in self.section for this HWRFTask (read-only)
Definition: hwrftask.py:422
def tvhave
Is a taskvar set?
Definition: hwrftask.py:229
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
Definition: hwrftask.py:396
def confraw
Get a raw configuration value before string expansion.
Definition: hwrftask.py:338
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
Time manipulation and other numerical routines.
Definition: numerics.py:1
def tvdel(self, opt)
Deletes an object-local value set by tvset.
Definition: hwrftask.py:215
def confget
Alias for self.conf.get for section self.section.
Definition: hwrftask.py:300
def confint
Alias for self.conf.getint for section self.section.
Definition: hwrftask.py:248
def timestr(self, string, ftime, atime=None, section=None, kwargs)
Expands a string in the given conf section, including time vars.
Definition: hwrftask.py:367
def conffloat
Alias for self.conf.getfloat for section self.section.
Definition: hwrftask.py:274
def tvget(self, opt)
Gets a taskvar's value.
Definition: hwrftask.py:221
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
Definition: hwrftask.py:195
def log
Obtain a logging domain.
Definition: hwrftask.py:425
def __init__(self, dstore, conf, section, taskname=None, workdir=None, outdir=None, storminfo=UNSPECIFIED, taskvars=UNSPECIFIED, kwargs)
Creates an HWRFTask.
Definition: hwrftask.py:38
def set_workdir(self, val)
Sets the directory the class should work in.
Definition: hwrftask.py:149
def conftimestrinterp(self, string, ftime, atime=None, section=None, kwargs)
Alias for self.timestr for backward comaptibility.
Definition: hwrftask.py:328
Configures logging.
Definition: log.py:1
def ismultistorm(self)
Definition: hwrftask.py:135
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
def get_outdir(self)
Gets the directory that should receive output data.
Definition: hwrftask.py:159
Raised when the requested configuration conf or hwrf_expt files fail a sanity check.
Definition: exceptions.py:88
def taskvars(self)
The dict of object-local values used for string substitution.
Definition: hwrftask.py:243
def isfakestorm(self)
Definition: hwrftask.py:113
def realtime(self)
Is this job a real-time forecast job?
Definition: hwrftask.py:180
def inputiter(self)
Iterates over all inputs required by this task.
Definition: hwrftask.py:436
def icstr(self, string, section=None, kwargs)
Expands a string in the given conf section.
Definition: hwrftask.py:351
Storm vitals information from ATCF, B-deck, tcvitals or message files.
Definition: storminfo.py:411
def set_outdir(self, val)
Sets the directory that should receive output data.
Definition: hwrftask.py:167
def getsection(self)
Returns this HWRFTask's section name in the HWRFConfig.
Definition: hwrftask.py:417