HWRF  trunk@4391
wps.py
1 """!This module contains Tasks to run the WRF Preprocessing System
2 (WPS): Geogrid, Ungrib and Metgrid."""
3 
4 ##@var __all__
5 # Ensures nothing is exported by "from hwrf.wps import *"
6 __all__=[] # prevent accidental "from hwrf.wps import *"
7 
8 import os, shutil, collections, glob, time, math, re, itertools
9 import string, urlparse, datetime, collections
10 
11 import hwrf.namelist
12 import hwrf.exceptions
13 
14 import produtil.fileop
15 import produtil.run
16 import produtil.locking
17 
18 from hwrf.hwrftask import HWRFTask
19 from hwrf.exceptions import WPSError, UngribSubsetError, GeogridNoOutput, \
20  TimestepTooShort, GeogridNoLog, UngribNoInput, UngribInputUnknown
21 from hwrf.numerics import partial_ordering, TimeArray, to_timedelta, \
22  to_datetime_rel, to_fraction, to_datetime
23 from produtil.datastore import FileProduct, COMPLETED, FAILED, RUNNING
24 from produtil.cd import TempDir, NamedDir
25 from produtil.fileop import *
26 from produtil.run import runstr,checkrun,alias,bigexe
27 from collections import defaultdict
28 
29 ########################################################################
30 
31 ##@var _wps_namelist_order
32 # An hwrf.numerics.partial_ordering for sorting the WPS namelists
33 _wps_namelist_order = partial_ordering([
34  'share',
35  'geogrid',
36  'ungrib',
37  'metgrid',
38  'mod_levs',
39  ])
40 
41 ##@var _wps_nl_var_order
42 # A mapping from namelist name to hwrf.numerics.partial_ordering
43 # objects that sort namelist variables within each namelist.
44 _wps_nl_var_order = {
45  'share' : partial_ordering([
46  'wrf_core',
47  'max_dom',
48  'start_date',
49  'end_date',
50  'max_dom',
51  'interval_seconds',
52  'io_form_geogrid',
53  'io_form_metgrid',
54  'nocolons',
55  ]),
56  'geogrid' : partial_ordering([
57  'parent_id',
58  'parent_grid_ratio',
59  'i_parent_start',
60  'j_parent_start',
61  'e_we',
62  'e_sn',
63  'geog_data_res',
64  'dx',
65  'dy',
66  'map_proj',
67  'ref_lat',
68  'ref_lon',
69  'geog_data_path',
70  'opt_geogrid_tbl_path',
71  'ref_x',
72  'ref_y',
73  ]),
74  'ungrib' : partial_ordering([
75  'out_format',
76  'prefix',
77  ]),
78  'metgrid' : partial_ordering([
79  'fg_name',
80  'io_form_metgrid',
81  'opt_metgrid_tbl_path',
82  ]),
83  'mod_levs' : partial_ordering([
84  'press_pa',
85  ]),
86  }
87 
88 ########################################################################
89 
91  """!This subclass of HWRFTask represents a WPS Task. Multiple WPS
92  jobs run in the same work directory. This class allows the jobs
93  to be represented together as a set, with one work directory
94  specified at the top level. This class exists only to reduce code
95  duplication by letting Metgrid, Geogrid and Ungrib share a
96  superclass."""
97  def __init__(self, dstore, conf, section, sim, domains, taskname=None,
98  wpsdir=None, starttime=None, increment=None, endtime=None,
99  parent_atime=None, geogrid_from=None, **kwargs):
100  """!Create a new WPSTask.
101 
102  @param dstore The produtil.datastore.Datastore for database information
103  @param conf the hwrf.config.HWRFConfig for configuration info
104  @param section the section to use in conf
105  @param sim The "sim" is the
106  WRFSimulation for which we are preparing input.
107  @param domains The domains must be a list of WRFDomain
108  objects.
109  @param taskname Optional: the name of this Task. Default: config section name.
110  @param wpsdir Optional: the directory in which to run WPS.
111  Default: taskname subdirectory of conf.getdir("WORKhwrf")
112 
113  @param starttime,endtime Optional: simulation length. Default: same as sim.
114  @param increment Optional: boundary input interval. Default: get from sim.
115  @param parent_atime Optional: analysis time of parent model. Default: starttime
116  @param geogrid_from Optional: a Geogrid object whose output
117  should be linked before running this WPS step. This is
118  used when the WPS Ungrib and Metgrid are run more than
119  once on the same domain. For example, one may run
120  multiple analysis times or multiple forecast lengths off
121  of the same geogrid output.
122  @param kwargs Other options are sent to the hwrf.hwrftask.HWRFTask.__init__ constructor.
123  """
124  if taskname is None: taskname=section
125  super(WPSTask,self).__init__(dstore,conf,section,taskname,**kwargs)
126  self.location=self.workdir
127  self._sim=sim
128  # Get WRF's copy of the domains so we have the parent-nest info:
129  self._domains=[sim[domain] for domain in domains]
130  self._products=None
131  self._geogrid_from=None
132  if geogrid_from is not None:
133  if not isinstance(geogrid_from,Geogrid):
134  raise TypeError(
135  'The geogrid_from parameter to WPSTask.__init__ must '
136  'be a Geogrid. You provided a %s %s.'
137  %(type(geogrid_from).__name__,repr(geogrid_from)))
138  self._geogrid_from=geogrid_from
139  self.nl = hwrf.namelist.Conf2Namelist(conf, self.confstr('namelist'))
140  if starttime is None: starttime=sim.simstart()
141  self.starttime=to_datetime(starttime)
142  if endtime is None: endtime=sim.simend()
143  self.endtime=to_datetime_rel(endtime,starttime)
144  self.increment=increment if increment is not None else 6*3600
145  self.increment=to_timedelta(self.increment)
146  if parent_atime is None:
147  parent_atime=starttime
148  self.parent_atime=parent_atime
149  if self.increment<to_timedelta(300):
150  raise TimestepTooShort(
151  'Geogrid timestep %s is smaller than 300 seconds.'
152  %(repr(self.increment),))
153  self.log().debug('%s times: start=%s end=%s increment=%s'%(
154  self.taskname, str(self.starttime), str(self.endtime),
155  str(self.increment)))
156  self._wps_namelist()
157  self.make_products()
158 
159  ##@var nl
160  # an hwrf.namelist.Conf2Namelist with the namelist.wps information
161 
162  ##@var starttime
163  # Simulation start time.
164 
165  ##@var endtime
166  # Simulation end time.
167 
168  ##@var increment
169  # Simulation boundary input interval
170 
171  ##@var parent_atime
172  # Parent model analysis time
173 
174  def _guess_nocolons(self):
175  """!Guesses whether we should use colons in filenames. This
176  information is obtained from sim.get_nocolons(). Note that
177  this requires re-scanning all input and output io_forms, so
178  this function is moderately expensive."""
179  nc=self._sim.get_nocolons()
180  return nc
181 
182  ##@property nocolons
183  # Read-only property that guesses whether colons should be
184  # omitted from filenames (True) or not (False)
185  nocolons=property(_guess_nocolons,None,None, \
186  """A property that is True if WPS will omit colons from the
187  output filenames, and False otherwise.""")
188 
189  def io_suffix(self,stream):
190  """!Guesses the file suffix (.nc, etc.) for the specified stream.
191 
192  Tries to guess what io suffix WPS will use for a stream. If
193  the stream is not one of the known streams, this function logs
194  a warning and tries to continue, but will likely fail.
195 
196  @param stream the stream of interest.
197  """
198  if(stream=='metgrid' or stream=='auxinput1'):
199  return self._sim.get_io_suffix('auxinput1')
200  elif(stream=='geogrid' or stream=='auxinput2'):
201  return self._sim.get_io_suffix('auxinput2')
202  else:
203  self.log().warning(
204  'caller requested unknown stream "%s" in WPSTask.io_suffix'
205  %(stream,))
206  return self._sim.get_io_suffix('input')
207 
208  def _maxdom(self):
209  """!The number of WRF domains, and highest domain number."""
210  return self._sim.maxdom()
211 
212  ##@property maxdom
213  # The number of WRF domains.
214  maxdom=property(_maxdom,None,None,"The number of WRF domains.")
215  def _MOAD(self):
216  """!The mother of all domains as a WRFDomain."""
217  return self._sim.get(1)
218 
219  ##@property MOAD
220  # The Mother Of All Domains (MOAD) as an hwrf.wrf.WRFDomain
221  MOAD=property(_MOAD,None,None,"Returns the Mother of All Domains (MOAD)")
222 
223  def getsim(self):
224  """!The WRFSimulation object for which we are preparing input."""
225  return self._sim
226 
227  def check_geogrid(self):
228  """!Checks to see if the geogrid MOAD output file is present
229  and non-empty in the current working directory. Raises
230  GeogridNoOutput if the output is missing or empty."""
231  suffix=self.io_suffix('geogrid')
232  filename='geo_nmm.d01.%s'%(suffix,)
233  if not produtil.fileop.isnonempty(filename):
234  raise GeogridNoOutput('%s: does not exist or is empty'
235  %(filename,))
236 
237  def link_geogrid(self):
238  """!Links geogrid output from the specified Geogrid task to the
239  current working directory. Will raise an exception if Geogrid
240  has not produced its outputs yet. This is used when multiple
241  Metgrid executions are used with the same Geogrid. The
242  geogrid only needs to be run once since its outputs do not
243  change with time, just with domain configuration.
244 
245  Specifically, this finds all Product objects in task
246  self._geogrid_from whose prodname contains "geo" and links
247  from the product's location to its prodname in the current
248  working directory. This logic must match the logic in
249  Geogrid.make_products and WPSTask.check_geogrid."""
250 
251  logger=self.log()
252  fromtask=self._geogrid_from
253  if fromtask is None:
254  logger.info('Not linking geogrid inputs. I hope you already '
255  'ran geogrid in this directory.')
256  return
257  else:
258  logger.info('Task %s linking geo products from %s task %s'
259  %(self.taskname,type(fromtask).__name__,
260  fromtask.taskname))
261 
262  for product in fromtask.products():
263  did=product.did
264  name=product.prodname
265  if name.find('geo')>=0:
266  logger.info(did+': should copy to '+name)
267  else:
268  logger.info(did+': does not contain "geo" so skipping this.')
269  continue
270 
271  # We need to link this file. Make sure it looks good first:
272  loc=product.location
273  av=product.available
274  msg=None
275  if not av:
276  msg=did+': product is not available'
277  elif loc is None or loc=='':
278  msg=did+': product has no location (but is available)'
279  elif not os.path.exists(loc):
280  msg=did+': file does not exist: '+repr(loc)
281  elif not produtil.fileop.isnonempty(loc):
282  msg=did+': file is empty: '+repr(loc)
283  if msg is not None:
284  logger.warning(msg)
285  raise GeogridNoOutput(msg)
286 
288  loc,name,force=True,logger=logger)
289  def times(self):
290  """!Iterates over all output times."""
291  now=self.starttime
292  end=self.endtime
293  dt=self.increment
294  while not (now>end):
295  yield now
296  now+=dt
297 
298  ##@property sim
299  # Read-only property: the hwrf.wrf.WRFSimulation for this WPSTask
300  sim=property(getsim,None,None,
301  """Returns the WRF simulation for this WPSTask""")
302  def domains(self):
303  """!Iterates over the domains in this WPSTask"""
304  for domain in self._domains: yield domain
305  def link_fix(self,geog_data=False,table=None):
306  """!Links all fix files for ungrib to the current working directory.
307  @param geog_data if True, also link the geog-data directory
308  @param table name of the table file symbolic link"""
309  logger=self.log()
310  try:
311  if table:
312  tbl=self.confstr('tbl')
313  tblhere=str(table)
314  logger.info('Link table file %s to %s'%(tblhere,tbl))
315  make_symlink(tbl,tblhere,force=True,logger=logger)
316  if geog_data:
317  make_symlink(self.getdir('geog_data'),'geog-data',force=True,
318  logger=logger)
319  except Exception as e:
320  # No need for exc_info here since the run() jobs add that already.
321  logger.warning('cannot link to WPS fix files: '+str(e))
322  raise
323  def make_product_structure(self,stream):
324  """!Generates the self._products data structure used by the
325  products method. Should be called by make_products in
326  subclasses."""
327  if stream=='metgrid' and self.endtime == self.starttime:
328  self._products=collections.defaultdict(
329  lambda: hwrf.numerics.TimeArray(
330  self.starttime,self.starttime+self.increment,self.increment))
331  else:
332  self._products=collections.defaultdict(
333  lambda: hwrf.numerics.TimeArray(
334  self.starttime,self.endtime,self.increment))
335  def make_products(self):
336  """!This subroutine should be implemented in subclasses. It
337  should call self.make_product_structure(), and then add
338  products by doing:
339  @code
340  self._products[domain.moad_ratio()][time]=product
341  @endcode
342  where the domain is a WRFDomainBase subclass, the time is
343  anything accepted by to_datetime_rel's first argument, and the
344  product is any subclass of Product."""
345  def products(self,time=None,domain=None):
346  """!Iterates over all products
347  @param domain the hwrf.wrf.WRFDomain
348  @param time the time
349  @note One, both or neither argument can be specified. All matching
350  products are yielded."""
351  if self._products is not None:
352  if domain is not None and domain.moad_ratio() in self._products:
353  tprod=self._products[domain.moad_ratio()]
354  if time is None:
355  for (time,product) in tprod.itervalues():
356  localprod=product
357  assert(localprod is not None)
358  yield localprod
359  elif time in tprod:
360  localprod=tprod[time]
361  assert(localprod is not None)
362  yield localprod
363  else:
364  if time is not None:
365  epsilon=hwrf.numerics.to_timedelta(10) # 10 seconds
366  for tprod in self._products.itervalues():
367  for thetime,product in tprod.iteritems():
368  if time is not None:
370  thetime,time,epsilon):
371  continue # skip this product because
372  # it is not at the right
373  # time.
374  assert(product is not None)
375  yield product
376 
377  def make_namelist(self):
378  """!Returns the namelist.wps contents as a string."""
379  return self.nl.remove_traits().set_sorters(_wps_namelist_order,
380  _wps_nl_var_order).make_namelist()
381  def undeliver_products(self,time=None,domain=None,fromloc=None):
382  """!Deletes all delivered products and marks them as
383  unavailable.
384  @param time,domain passed to products() to get the list of products
385  @param fromloc Ignored."""
386  logger=self.log()
387  time=self.starttime
388  prodlist=[p for p in self.products(time=time,domain=domain)]
389  for p in prodlist:
390  p.undeliver(logger=logger)
391  def deliver_products(self,time=None,domain=None,fromloc=None,
392  keep=True,relink=False):
393  """!This is called from self.run to deliver all products to the
394  intercom directory.
395 
396  @param time,domain The optional time and domain arguments
397  are sent to self.products.
398  @param fromloc By default, this routine assumes that the file
399  to be delivered is in the current working directory with the
400  same name as the destination file. To change that, give a
401  lambda function in "fromloc", which converts the destination
402  filename (the only argument) to a local filename.
403  @param keep The "keep" argument has the same meaning as in
404  deliver_file: if False, the file may be moved to the
405  destination.
406  @param relink If True, and the file is moved, then a symlink
407  will be made from the original file location to the
408  destination."""
409  link_files=self.confbool('link_wrf_fix',True)
410  keep=keep and link_files
411  logger=self.log()
412  if time is None:
413  time=self.starttime
414  bad=False
415  if fromloc is None: fromloc=os.path.basename
416  prodlist=[p for p in self.products(time=time,domain=domain)]
417  for p in prodlist:
418  try:
419  f=p.location
420  if f is None or len(f)<1:
421  bad=True
422  msg='%s: product has no location; cannot deliver it'%(
423  p.did,)
424  logger.warning(msg)
425  raise WPSError(msg)
426  fl=fromloc(f)
427  assert(isinstance(fl,basestring)) # from file should
428  # be a string
429  p.deliver(frominfo=fl,keep=keep,logger=logger)
430  if not keep and not os.path.exists(fl):
431  linkfile=fl
432  destfile=p.location
433  logger.info('%s: file was moved to destination %s'
434  %(linkfile,destfile))
435  if relink:
436  logger.info('%s: relink from delivered file %s'
437  %(linkfile,destfile))
438  make_symlink(destfile,linkfile,force=True,
439  logger=logger)
440  else:
441  logger.info('%s: not relinking. File now at %s'
442  %(linkfile,destfile))
443  except EnvironmentError as e:
444  logger.warning('%s: cannot deliver file'%(f,),exc_info=True)
445  bad=True
446  if bad:
447  logger.critical('some output files were empty or missing; '
448  'aborting')
449  def _wps_namelist(self):
450  """!Fills the self.nl namelist with correct information. This
451  must be called before make_namelist, and hence it is called
452  from the constructor."""
453  s = self.nl.nl_set
454  siu = self.nl.nl_set_if_unset
455  t = self.nl.trait_get
456  maxdom = self.maxdom
457  start = self.starttime.strftime("%Y-%m-%d_%H:%M:%S")
458  end = self.endtime
459 
460  s('share', 'max_dom', maxdom)
461  s('share', 'start_date', start)
462  s('share', 'end_date', end)
463  s('share', 'interval_seconds', int(self._conf.get('wrf','bdystep')))
464  io_form_geogrid=int(self.sim.io_form_for('auxinput2'))%100
465  io_form_metgrid=int(self.sim.io_form_for('auxinput1'))%100
466  if io_form_metgrid==11: io_form_metgrid=2
467  if io_form_geogrid==11: io_form_geogrid=2
468  s('share', 'io_form_geogrid', io_form_geogrid)
469  s('metgrid', 'io_form_metgrid', io_form_metgrid)
470  s('share','nocolons',self.nocolons)
471 
472  pid = []
473  gid = []
474  pgr = []
475  nx = []
476  ny = []
477  dx = []
478  dy = []
479  istart = []
480  jstart = []
481  res = []
482 
483  resolution = '10m' if (maxdom<3) else '2m'
484 
485  for d in self.domains():
486  if d.parent is None:
487  pid.append(1)
488  pgr.append(1)
489  dx.append(d.nl.trait_get('dx'))
490  dy.append(d.nl.trait_get('dy'))
491  istart.append(1)
492  jstart.append(1)
493  else:
494  pid.append(d.parent.get_grid_id())
495  pgr.append(d.nl.trait_get('parent_grid_ratio'))
496  start=str(d.nl.trait_get('start','auto')).lower()
497  if start == 'fixed':
498  istart.append(int(d.nl.trait_get('istart')))
499  jstart.append(int(d.nl.trait_get('jstart')))
500  else:
501  istart.append(10)
502  jstart.append(10)
503 
504  gid.append(d.get_grid_id())
505  nx.append(d.nl.trait_get('nx'))
506  ny.append(d.nl.trait_get('ny'))
507  res.append(resolution)
508 
509  # Set (s()) anything in the namelist that must have specific
510  # values. For values the user can override, set only if it is
511  # already unset (siu())
512  s ('geogrid', 'parent_id', pid)
513  s ('geogrid', 'parent_grid_ratio', pgr)
514  s ('geogrid', 'e_we', nx)
515  s ('geogrid', 'e_sn', ny)
516  s ('geogrid', 'dx', dx)
517  s ('geogrid', 'dy', dy)
518  s ('geogrid', 'i_parent_start', istart)
519  siu('geogrid', 'geog_data_res', res)
520  siu('geogrid', 'map_proj','rotated_ll')
521  s ('geogrid', 'geog_data_path', './geog-data/')
522  s ('geogrid', 'opt_geogrid_tbl_path','./')
523  s ('geogrid', 'j_parent_start', jstart)
524  s ('geogrid', 'ref_lat', self.conffloat('domlat',section='config'))
525  s ('geogrid', 'ref_lon', self.conffloat('domlon',section='config'))
526 
527  s ('metgrid', 'opt_metgrid_tbl_path','./')
528  s ('metgrid', 'fg_name','FILE')
529 
530  s ('ungrib', 'prefix','FILE')
531  siu('ungrib', 'out_format','WPS')
532 
533  siu('share', 'wrf_core','NMM')
534 
535 ########################################################################
537  """!This is a HWRF task that pre processes the geogrid to define the
538  model domains and interpolates static geographical data to the
539  grids."""
540 
541  def __init__(self,*args,**kwargs):
542  """!Creates a new Geogrid.
543  @param args,kwargs All arguments are passed to the
544  WPSTask.__init__() constructor."""
545  super(Geogrid, self).__init__(*args,**kwargs)
546 
547  def make_products(self):
548  """!Creates the FileProduct objects for this Geogrid.
549 
550  @note Implementation note: If you change the list of products,
551  make sure all geogrid outputs needed as input to ungrib or
552  metgrid have a prodname that is the same as the destination
553  file."""
554  self.make_product_structure('geogrid')
555  suffix = self.io_suffix('geogrid')
556  time=self.starttime
557  for d in self.domains():
558  id = d.get_grid_id()
559  if d.parent is None:
560  f = "geo_nmm.d%02d.%s" %(id, suffix)
561  else:
562  f = "geo_nmm_nest.l%02d.%s" %(id -1, suffix)
563  dest=os.path.join(self.outdir,f)
564  prod=FileProduct(self._dstore, f, self.taskname,
565  meta={"domain": d.name}, location=dest)
566  prod.location=dest
567  self.log().debug('geogrid made product %s with location %s'
568  %(repr(prod.did),repr(prod.location)))
569  self._products[d.moad_ratio()][time]=prod
570  def geodat(self,dom):
571  """!Returns the FileProduct for the geogrid data for the
572  specified nesting ratio. The specified domain does not have
573  to be one of the known domains. It just has to have the same
574  nest:parent ration (WRFDomain.moad_ratio) as one.
575  @param dom The hwrf.wrf.WRFDomain of interest."""
576  ratio=dom.moad_ratio()
577  assert(ratio in self._products)
578  return self._products[ratio][self.starttime]
579  def run(self):
580  """!Copies inputs, links fix files, runs geogrid and delivers
581  results."""
582  logger=self.log()
583  try:
585  with NamedDir(self.location) as dir:
586  logger.info('Geogrid running in directory: '+os.getcwd())
587  assert(not re.match('\A/tmp',os.getcwd()))
588 
589  for f in glob.glob('geo*'):
590  try:
591  produtil.fileop.remove_file(f,logger=logger)
592  except(EnvironmentError) as e:
593  logger.warning('%s: did not remove file, but '
594  'continuing anyway'%(f,))
595  self.link_fix(geog_data=True,table='GEOGRID.TBL')
596 
597  with open('namelist.wps', 'w') as f:
598  f.write(self.make_namelist())
599 
600  prog = self.getexe('hwrf_geogrid')
601  log = self._section + '.log'
603  allranks=True)
604  if self.redirect: cmd=cmd > log
605  logger.info('%s command: %s'%(self.taskname, repr(cmd),))
606  produtil.run.checkrun(cmd,logger=logger)
607  findme="Successful completion"
608  geogrid_log=None
609  for glog in ( 'geogrid.log', 'geogrid.log.0000',
610  'geogrid.log.00000' ):
611  if os.path.exists(glog):
612  geogrid_log=glog
613  else:
614  logger.info('%s: does not exist.'%(glog,))
615  if geogrid_log is None:
616  msg='WPS Geogrid failed: could not find geogrid log file.'
617  logger.error(msg)
618  self.state=FAILED
619  raise GeogridNoLog(msg)
620  logger.info('%s: will check for %s'%(geogrid_log,findme))
621  if not check_last_lines(geogrid_log,findme):
622  raise WPSError('%s: did not find "%s"'
623  %(geogrid_log,findme))
624  self.deliver_products(keep=False,relink=True)
625 
626  except WPSError as we:
627  self.state=FAILED
628  raise
629  except Exception as e:
630  logger.critical('WPS Geogrid failed: '+str(e),exc_info=True)
631  self.state=FAILED
632  raise
633  self.state=COMPLETED
634  self.postmsg('WPS Geogrid completed.')
635 
636 ########################################################################
638  """!This is a HWRF task that extracts the meteorological fields
639  from GRIB formatted files and write the fields to intermediate
640  files. It is a wrapper around the WPS ungrib.exe program."""
641 
642  def __init__(self,*args,**kwargs):
643  """!Creates a new Ungrib. All arguments are passed to the
644  WRFTask constructor. The only new arguments are:
645 
646  * in_ftime - Optional: the parent model forecast hour to use
647  for analysis time data in this Ungrib.
648 
649  * in_item, in_dataset - Optional: the item and dataset, in
650  hwrf.input terminology, to use to request the GRIB1/2
651  input files.
652 
653  * in_item2, in_dataset2 - Optional: same as in_item and
654  in_dataset, but for a second GRIB file that is appended to
655  the first.
656 
657  * inputs - Optional: an hwrf.input.DataCatalog to provide
658  input file locations. Default: construct one using the
659  catalog name from the "catalog" option in this objects
660  config section.
661 
662  @param args,kwargs passed to WPSTask.__init__ """
663  super(Ungrib, self).__init__(*args,**kwargs)
664  if 'inputs' in kwargs and kwargs['inputs']:
665  self.inputs=kwargs['inputs']
666  else:
667  hd=self.confstr('catalog','hwrfdata')
669  self.conf,hd,self.sim.simstart())
670  self.__one_time=bool(kwargs.get('one_time',False))
671 
672  # Get input model's analysis time:
673  in_atime=kwargs['in_atime'] if('in_atime' in kwargs) else \
674  self.confstr('atime','')
675  if in_atime is None or in_atime=='':
676  self.in_atime=self.sim.simstart()
677  else:
678  self.in_atime=to_datetime_rel(in_atime,self.sim.simstart())
679 
680  # Get first forecast hour to process from input model:
681  in_ftime=kwargs['in_ftime'] if('in_ftime' in kwargs) else None
682  if in_ftime is None: in_ftime=self.confint('ftime',0)
683  if isinstance(in_ftime,datetime.datetime):
684  in_ftime=in_ftime-in_atime
685  if not isinstance(in_ftime,int) and not isinstance(in_ftime,float) \
686  and not isinstance(in_ftime,datetime.timedelta):
687  raise TypeError('in_ftime must be an int, a float or a '
688  'timedelta, not a %s (in_ftime=%s)'%(
689  in_ftime.__class__.__name__,repr(in_ftime)))
690 
691  self.in_ftime = to_timedelta(in_ftime)
692  self.in_dataset = str(kwargs.get(
693  'in_dataset',self.confstr('dataset','hwrfdata')))
694  self.in_dataset2 = str(kwargs.get(
695  'in_dataset2',self.confstr('dataset2',self.in_dataset)))
696  self.in_item = str(kwargs.get(
697  'in_item',self.confstr('item','gfs_pgrb')))
698  self.in_item2 = str(kwargs.get(
699  'in_item2',self.confstr('item2','')))
700  self.log().debug('self.in_item=%s dataset=%s section=%s'%(
701  repr(self.in_item),repr(self.in_dataset),
702  repr(self.section)))
703  self._n_gribfiles = 0
704 
705  # if self.gribs_per_time>1 and (
706  # self.confstr('subset','')!=''
707  # or self.confstr('subset_grib1','')!=''
708  # or self.confstr('subset_grib2','')!=''):
709  # raise hwrf.exceptions.UngribCannotSubset(
710  # 'hwrf.wps.Ungrib does not know how to merge two GRIB '
711  # 'files and subset them.')
712 
713  ##@var inputs
714  # the hwrf.input.DataCatalog to use for obtaining input data
715 
716  ##@var in_atime
717  # Parent model analysis time
718 
719  ##@var in_ftime
720  # Parent model forecast hour that maps to the analysis time of this model
721 
722  ##@var in_dataset
723  # Input dataset for the first GRIB source
724 
725  ##@var in_dataset2
726  # Input dataset for the second GRIB source
727 
728  ##@var in_item
729  # Input item for the first GRIB source
730 
731  ##@var in_item2
732  # Input item for the second GRIB source
733 
734  @property
735  def one_time(self):
736  """!If True, we are pretending that hour 0 is valid for all
737  times. This is equivalent to constant boundary conditions.
738  If in_ftime is non-zero, then that hour is used instead."""
739  return self.__one_time
740 
741  @property
742  def gribs_per_time(self):
743  """!How many grib files are processed for each time? This is 2
744  if an item2 or in_item2 were given in the config or
745  constructor, respectively. Otherwise it is 1."""
746  if self.in_item2 is not None and self.in_item2!='':
747  return 2
748  return 1
749 
750  def inputiter(self):
751  """!Iterates over all input files needed. This is meant to be
752  plugged in to an hwrf.input.InputSource to obtain input data
753  in the scripts.exhwrf_input job."""
754  start=self.starttime
755  for t in self.times():
756  dt=t-start
757  if self.one_time: dt=0
758  ftime=to_datetime_rel(dt,to_datetime_rel(self.in_ftime,
759  self.in_atime))
760  yield dict(self.taskvars,dataset=self.in_dataset,
761  item=self.in_item,ftime=ftime,atime=self.in_atime)
762  if not self.in_item2 or not self.in_dataset2: continue
763  yield dict(self.taskvars,dataset=self.in_dataset2,
764  item=self.in_item2,ftime=ftime,atime=self.in_atime,
765  optional=self.confbool('item2_optional',False))
766 
767  def input_at(self,dt,igrib=1,require=False):
768  """!Finds input data for a specific time and GRIB file
769  @param dt the forecast time as a datetime.datetime, relative
770  to the in_ftime
771  @param igrib 1 or 2, which input file is used (in_item or in_item2
772  This is needed for models like GFS and GEFS that split their
773  GRIB files into two parts.
774  @param require if True, then hwrf.exceptions.UngribNoInput is
775  raised when inputs cannot be found."""
776  if self.__one_time: dt=0
777  logger=self.log()
778  item=self.in_item
779  dataset=self.in_dataset
780  ftime=to_datetime_rel(dt,to_datetime_rel(self.in_ftime,
781  self.in_atime))
782  stratime=self.in_atime.strftime("%Y%m%d%H")
783  logger.info("Check for dataset=%s item=%s ftime=%s atime=%s in %s"%(
784  dataset, item, ftime.strftime("%Y%m%d%H"), stratime,
785  repr(self.inputs) ))
786  logger.debug('inputs: '+repr(self.inputs))
787  if igrib==1:
788  ret=self.inputs.locate(self.in_dataset,self.in_item,
789  ftime=ftime,atime=self.in_atime,
790  **self.taskvars)
791  else:
792  ret=self.inputs.locate(self.in_dataset2,self.in_item2,
793  ftime=ftime,atime=self.in_atime,
794  **self.taskvars)
795  self.log().info("Got back: "+repr(ret))
796  if require and (ret is None or ret==''):
797  raise UngribNoInput(
798  "Cannot find input for: dataset=%s item=%s ftime=%s igrib=%d"
799  "atime=%s in %s"%(dataset, item, ftime.strftime("%Y%m%d%H"),
800  stratime, repr(self.inputs),igrib ))
801  return ret
802  def get_grib(self,require=False):
803  """!Link or copies all the input GRIB files to the current
804  working directory. Note that if two grib files are requested,
805  then this is done by copying the data.
806  @param require if True, then hwrf.exceptions.UngribNoInput is
807  raised when inputs cannot be found."""
808  logger=self.log()
809  j = 0
810  files = []
811  start=self.starttime
812  igribmax=self.gribs_per_time
813  for t in self.times():
814  dt=t-start # forecast time as a datetime.timedelta
815  fhr=int(math.ceil(to_fraction(dt)/3600)) # forecast hour as int
816  logger.info('Need to get input for t-start=%s-%s=%s=hour %s '
817  'igribmax=%s'%(
818  repr(t),repr(start),repr(dt),repr(fhr),repr(igribmax)))
819  opt2=self.confbool('item2_optional',False)
820  for igrib in xrange(igribmax):
821  #f = "gfs.t%02dz.pgrb%sf%02d" %(start.hour, g2, fhr)
822  f=self.input_at(dt,igrib=igrib+1)
823  if igrib==1 and opt2:
824  continue
825  if f is None or f=='':
826  raise UngribNoInput(
827  "Cannot find input for hour %d"%(fhr,))
828 
829  files.append(f)
830  logger.info('Input for hour %s is %s'%(repr(fhr),repr(f)))
831 
832  if self.realtime:
833  if not wait_for_files(
834  files,logger,
835  maxwait=self.confint('max_grib_wait',1800),
836  sleeptime=self.confint('grib_sleep_time',20),
837  min_size=self.confint('min_grib_size',1),
838  min_mtime_age=self.confint('min_grib_age',30),
839  min_atime_age=None,
840  min_ctime_age=None,
841  min_fraction=1.0):
842 
843  logger.error('Some input GRIB files do not exist. Giving '
844  'up.')
845  raise UngribNoInput(
846  'Some GRIB files are missing. See the log for details.')
847 
848  names=dict()
849  if self.gribs_per_time>1:
850  logger.info('Merging GRIB files.')
851  for t in self.times():
852  dt=t-start # forecast time as a datetime.timedelta
853  fhr=int(math.ceil(to_fraction(dt)/3600)) # forecast
854  # hour as int
855  in1=self.input_at(dt,igrib=1)
856  in2=self.input_at(dt,igrib=2)
857  assert(in1!=in2)
858  out=self._rename_grib()
859  names[t]=out
860  logger.info('%s: merge GRIBs for time %s here'
861  %(repr(out),repr(fhr)))
862  with open(out,'wb') as outf:
863  logger.info('%s: copy from %s'%(repr(out),repr(in1)))
864  with open(in1,'rb') as inf1:
865  shutil.copyfileobj(inf1,outf)
866  try:
867  logger.info('%s: copy from %s'%(repr(out),repr(in2)))
868  with open(in2,'rb') as inf2:
869  shutil.copyfileobj(inf2,outf)
870  except EnvironmentError as e:
871  opt=self.confbool('item2_optional',False)
872  if opt:
873  logger.warning('%s: ignoring exception'%(
874  str(in2),),exc_info=True)
875  else:
876  raise
877  logger.info('%s: done'%(repr(out),))
878  else:
879  logger.info('Not subsetting or merging GRIB files')
880  # Don't subset. Just link:
881  for t in self.times():
882  dt=t-start # forecast time as a datetime.timedelta
883  in1=self.input_at(dt,igrib=1)
884  out=self._rename_grib()
885  names[t]=out
886  make_symlink(in1,out,force=True,logger=self.log())
887 
888  subset0=self.confstr('subset','')
889  subset1file=self.confstr('subset_grib1',subset0)
890  subset2file=self.confstr('subset_grib2',subset0)
891  if subset1file or subset2file:
892  logger.info('Subsetting GRIB files')
893  cmd2=alias(bigexe(self.getexe('wgrib2','wgrib2')))
894  cmd1=alias(bigexe(self.getexe('wgrib')))
895  subset1=list()
896  subset2=list()
897 
898  if subset1file:
899  with open(subset1file,'rt') as f:
900  for line in f:
901  if line: subset1.append(line.rstrip())
902  subset1_re=re.compile('|'.join(subset1))
903  else:
904  subset1_re=None
905 
906  if subset2file:
907  with open(subset1file,'rt') as f:
908  for line in f:
909  if line: subset2.append(line.rstrip())
910  subset2_re=re.compile('|'.join(subset2))
911  else:
912  subset2_re=None
913 
914  for t in self.times():
915  srcfile=names[t]
916  tgtfile=os.path.basename(srcfile)+".subsetted"
917  cmd=None
918  subset_re=None
919  gribver=produtil.fileop.gribver(srcfile)
920  if gribver==2:
921  if subset2file:
922  cmd=cmd2
923  subset_re=subset2_re
924  elif gribver==1:
925  if subset1file:
926  cmd=cmd1
927  subset_re=subset1_re
928  else:
929  raise UngribInputUnknown(
930  "%s: is neither GRIB1 nor GRIB2."%(srcfile,))
931 
932  if cmd is not None and subset_re is not None:
933  logger.info("%s: subsetting from %s"%(tgtfile,srcfile))
934  self._subset_grib(srcfile,tgtfile,cmd,subset_re)
935  logger.info('%s: delete and replace with subset %s'%(srcfile,tgtfile))
936  os.unlink(srcfile)
937  os.rename(tgtfile,srcfile)
938  else:
939  logger.info("%s: not subsetting"%(srcfile))
940 
941  def _subset_grib(self, srcfile,tgtfile,cmd,matcher):
942  """!Runs wgrib on a GRIB1 input file to get its content
943  listing. Then runs wgrib again to subset it.
944  * srcfile - the input file, to scan and subset
945  * tgtfile - the location for the new, subset file
946  * cmd - a produtil.prog.ImmutableRunner for wgrib
947  * matcher - a regular expression, or anything else that has
948  a .search method. Each call to search takes one line and
949  returns True if it should be included in the subset, or
950  False otherwise. """
951  subset=''
952  (k,d)=(0,0) # keep, discard
953  for line in runstr(cmd[srcfile],logger=self.log()).splitlines(True):
954  if matcher.search(line):
955  subset+=line
956  k+=1
957  else:
958  d+=1
959  if not k:
960  raise UngribSubsetError('%s: no matching records in file'%(
961  tgtfile,))
962  self.log().info('%s => %s: keeping %d records, discarding %d'%(
963  srcfile,tgtfile,k,d))
964  remove_file(tgtfile)
965  runme=cmd['-i','-grib','-o',tgtfile,srcfile] << subset
966  self.log().info('run: %s'%(repr(runme),))
967  checkrun(runme,logger=self.log())
968  if not produtil.fileop.isnonempty(srcfile):
969  raise UngribSubsetError('%s: file is non-existent or empty'
970  %(tgtfile,))
971 
972  def _rename_grib(self, filename=None):
973  """!Generates a GRIB filename using the input name expected by
974  WPS: GRIBFILE.AAA for the first, GRIBFILE.AAB for the second,
975  and so on. An internal counter self._n_gribfiles keeps track
976  of the number of files requested so far. The optional
977  filename argument is ignored.
978  @param filename Ignored.
979  @returns the new filename chosen."""
980  sufs = [a+b+c for a,b,c in itertools.product(
981  string.ascii_uppercase, repeat = 3)]
982  new_filename = "GRIBFILE.%s" %(sufs[self._n_gribfiles])
983  self._n_gribfiles += 1
984  return new_filename
985 
986  def run(self):
987  """!Links inputs and runs ungrib. Ungrib has no deliverables:
988  it only places files in the local directory for a later
989  Metgrid.run() to pick up and use."""
990  logger=self.log() # avoid several function calls
991  try:
993  with NamedDir(self.location) as dir:
994  logger.info('Ungrib starting in %s'%(os.getcwd(),))
995 
996  assert(not re.match('\A/tmp',os.getcwd()))
997 
998  with open('namelist.wps', 'w') as f:
999  f.write(self.make_namelist())
1000 
1001  self.link_fix(table='Vtable')
1002  self.get_grib()
1003 
1004  prog = self.getexe('hwrf_ungrib')
1005  log = self._section + '.log'
1006  cmd = produtil.run.exe(prog)
1007  if self.redirect: cmd = cmd > log
1008  logger.info('%s command: %s'%(self.taskname, repr(cmd),))
1009  produtil.run.checkrun(cmd,logger=logger)
1010  self.deliver_products()
1011  self.check_outfiles()
1012  except Exception as e:
1013  logger.critical('WPS Ungrib failed: '+str(e),exc_info=True)
1014  self.state=FAILED
1015  raise
1016  self.state=COMPLETED
1017  self.postmsg('WPS Ungrib completed')
1018 
1019  def check_outfiles(self):
1020  """!Checks the expected ungrib output files to make sure they
1021  all exist and are non-empty."""
1022  bad=False
1023  logger=self.log()
1024  for i in self.times():
1025  f = "%s/FILE:%s" %(self.location, i.strftime("%Y-%m-%d_%H"))
1027  logger.info('%s: exists, is non-empty.'%(f,))
1028  else:
1029  logger.warning('%s: does not exist or is empty'%(f,))
1030  bad=True
1031  if bad:
1032  logger.error('WPS Ungrib failed: some output files did not '
1033  'exist or were empty. See stdout/stderr log for '
1034  'details.')
1035  raise WPSError(
1036  "WPS Ungrib output file %s does not exist or is empty" %f)
1037  def deliver_products(self,*args,**kwargs):
1038  """!Does nothing. Ungrib has no products to deliver.
1039  @param args,kwargs Ignored."""
1040  def products(self,**kwargs):
1041  """!Ungrib delivers no products. Everything is kept in the WPS
1042  temporary directory and reused by metgrid. The Metgrid
1043  implementation assumes it is running in the same directory as
1044  Ungrib, so no call to products() is needed.
1045  @param kwargs Ignored."""
1046  if False: yield # necessary for syntactic reasons
1047 
1048 ########################################################################
1050  """!This is a HWRF task that horizontally interpolates the
1051  meteorological fields extracted by ungrib to the model grids
1052  defined by geogrid. It is a wrapper around the WPS metgrid.exe
1053  Fortran program."""
1054 
1055  def __init__(self, *args,**kwargs):
1056  """!Creates a new Metgrid.
1057  @param args,kwargs All arguments are passed to the
1058  WPSTask.__init__() constructor."""
1059  super(Metgrid, self).__init__(*args,**kwargs)
1060  self._n_gribfiles=0
1061  def run(self):
1062  """!Copies inputs, runs metgrid, delivers outputs.
1063 
1064  @note Ungrib must have been run in the same directory
1065  beforehand. The Geogrid must have been run in the same
1066  directory, unless it was passed as input via geogrid_from= in
1067  the constructor."""
1068  logger=self.log()
1069  try:
1071  with NamedDir(self.location,keep=not self.scrub) as dir:
1072  logger.info('WPS Metgrid running in dir: '+os.getcwd())
1073  assert(not re.match('\A/tmp',os.getcwd()))
1074 
1075  with open('namelist.wps', 'w') as f:
1076  f.write(self.make_namelist())
1077 
1078  self.link_geogrid()
1079  self.check_geogrid()
1080 
1081  self.link_fix(table='METGRID.TBL')
1082  prog = self.getexe('hwrf_metgrid')
1083  log = self._section + '.log'
1085  allranks=True)
1086  if self.redirect: cmd = cmd > log
1087  logger.info('%s command: %s'%(self.taskname, repr(cmd),))
1088  ok = produtil.run.checkrun(cmd,logger=logger)
1089  for time in self.times():
1090  logger.info('%s: deliver products for this time.'
1091  %(time.strftime('%Y-%m-%d %H:%M:%S')))
1092  self.deliver_products(keep=False,relink=True,time=time)
1093  if self.endtime == self.starttime:
1094  p=self._products[1][time]
1095  timeb=self.starttime+self.increment
1096  pb=self._products[1][timeb]
1097  make_symlink(p.location,pb.location,force=True,
1098  logger=logger)
1099  pb.available=True
1100 
1101  except Exception as e:
1102  logger.critical('WPS Metgrid failed: %s'%(str(e),),
1103  exc_info=True)
1104  self.state=FAILED
1105  raise
1106  self.state=COMPLETED
1107  self.postmsg('WPS Metgrid completed')
1108  def met_at_time(self,when):
1109  """!Returns a FileProduct for the Metgrid file at the specified
1110  time or None if no such product is found
1111  @param when the time of interest"""
1112  if when in self._products[1]:
1113  return self._products[1][when]
1114  else:
1115  return None
1116  def make_products(self):
1117  """!Generates FileProduct objects for this Metgrid. This is
1118  called automatically from the constructor."""
1119  self.make_product_structure('metgrid')
1120  MOAD=self.MOAD
1121  suffix=self.io_suffix('metgrid')
1122  format = ("%Y-%m-%d_%H_%M_%S" if self.nocolons else \
1123  "%Y-%m-%d_%H:%M:%S")
1124  for time in self.times():
1125  f="met_nmm.d01.%s.%s"%(time.strftime(format),suffix)
1126  loc=os.path.join(self.outdir,f)
1127  prod=FileProduct(self.dstore, f, self.taskname, location=loc)
1128  prod.location=loc
1129  self._products[1][time] = prod
1130 
1131  if self.endtime == self.starttime:
1132  timeb=self.starttime+self.increment
1133  fb="met_nmm.d01.%s.%s"%(timeb.strftime(format),suffix)
1134  locb=os.path.join(self.outdir,fb)
1135  prodb=FileProduct(self.dstore, fb, self.taskname, location=locb)
1136  self._products[1][timeb] = prodb
1137 
1138 
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def mpirun(arg, kwargs)
Converts an MPI program specification into a runnable shell program suitable for run(), runstr() or checkrun().
Definition: run.py:258
in_item2
Input item for the second GRIB source.
Definition: wps.py:698
def getsim(self)
The WRFSimulation object for which we are preparing input.
Definition: wps.py:223
Generates a Fortran namelist entirely from config files.
Definition: namelist.py:411
def get_grib
Link or copies all the input GRIB files to the current working directory.
Definition: wps.py:802
def to_timedelta
Converts an object to a datetime.timedelta.
Definition: numerics.py:371
def link_geogrid(self)
Links geogrid output from the specified Geogrid task to the current working directory.
Definition: wps.py:237
parent_atime
Parent model analysis time.
Definition: wps.py:148
def input_at
Finds input data for a specific time and GRIB file.
Definition: wps.py:767
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
def gribs_per_time(self)
How many grib files are processed for each time? This is 2 if an item2 or in_item2 were given in the ...
Definition: wps.py:742
Raised when wgrib or wgrib2 generates an empty or invalid file.
Definition: exceptions.py:374
def redirect(self)
Should subprograms' outputs be redirected to separate files?
Definition: hwrftask.py:190
MOAD
The Mother Of All Domains (MOAD) as an hwrf.wrf.WRFDomain.
Definition: wps.py:221
Handles file locking using Python "with" blocks.
Definition: locking.py:1
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
def _wps_namelist(self)
Fills the self.nl namelist with correct information.
Definition: wps.py:449
A subclass of Product that represents file delivery.
Definition: datastore.py:856
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
def remove_file
Deletes the specified file.
Definition: fileop.py:251
def io_suffix(self, stream)
Guesses the file suffix (.nc, etc.) for the specified stream.
Definition: wps.py:189
Base class of WPS-related exceptions.
Definition: exceptions.py:358
This is a HWRF task that horizontally interpolates the meteorological fields extracted by ungrib to t...
Definition: wps.py:1049
conf
This HWRFTask's hwrf.config.HWRFConfig object.
Definition: hwrftask.py:415
starttime
Simulation start time.
Definition: wps.py:141
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
in_item
Input item for the first GRIB source.
Definition: wps.py:696
def _rename_grib
Generates a GRIB filename using the input name expected by WPS: GRIBFILE.AAA for the first...
Definition: wps.py:972
endtime
Simulation end time.
Definition: wps.py:143
in_dataset2
Input dataset for the second GRIB source.
Definition: wps.py:694
def check_outfiles(self)
Checks the expected ungrib output files to make sure they all exist and are non-empty.
Definition: wps.py:1019
in_dataset
Input dataset for the first GRIB source.
Definition: wps.py:692
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
Definition: run.py:398
def within_dt_epsilon(time1, time2, epsilon)
Returns True if time1 is within epsilon of time2, and False otherwise.
Definition: numerics.py:198
def run(self)
Links inputs and runs ungrib.
Definition: wps.py:986
def __init__(self, args, kwargs)
Creates a new Metgrid.
Definition: wps.py:1055
def confbool
Alias for self.conf.getbool for section self.section.
Definition: hwrftask.py:287
section
The confsection in self.section for this HWRFTask (read-only)
Definition: hwrftask.py:422
def gribver(filename)
What is the GRIB version of this file?
Definition: fileop.py:201
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
def run(self)
Copies inputs, links fix files, runs geogrid and delivers results.
Definition: wps.py:579
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
Definition: hwrftask.py:396
def make_namelist(self)
Returns the namelist.wps contents as a string.
Definition: wps.py:377
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
nocolons
Read-only property that guesses whether colons should be omitted from filenames (True) or not (False)...
Definition: wps.py:185
def met_at_time(self, when)
Returns a FileProduct for the Metgrid file at the specified time or None if no such product is found...
Definition: wps.py:1108
nl
an hwrf.namelist.Conf2Namelist with the namelist.wps information
Definition: wps.py:139
Raised when a geogrid output file is missing or empty.
Definition: exceptions.py:364
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Definition: fileop.py:333
def products(self, kwargs)
Ungrib delivers no products.
Definition: wps.py:1040
in_ftime
Parent model forecast hour that maps to the analysis time of this model.
Definition: wps.py:691
def deliver_products
This is called from self.run to deliver all products to the intercom directory.
Definition: wps.py:392
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
def makedirs
Make a directory tree, working around filesystem bugs.
Definition: fileop.py:224
Time manipulation and other numerical routines.
Definition: numerics.py:1
def make_product_structure(self, stream)
Generates the self._products data structure used by the products method.
Definition: wps.py:323
maxdom
The number of WRF domains.
Definition: wps.py:214
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def confint
Alias for self.conf.getint for section self.section.
Definition: hwrftask.py:248
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
Definition: namelist.py:1
def conffloat
Alias for self.conf.getfloat for section self.section.
Definition: hwrftask.py:274
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
Definition: hwrftask.py:195
def log
Obtain a logging domain.
Definition: hwrftask.py:425
def _subset_grib(self, srcfile, tgtfile, cmd, matcher)
Runs wgrib on a GRIB1 input file to get its content listing.
Definition: wps.py:941
def inputiter(self)
Iterates over all input files needed.
Definition: wps.py:750
def make_products(self)
Generates FileProduct objects for this Metgrid.
Definition: wps.py:1116
def deliver_products(self, args, kwargs)
Does nothing.
Definition: wps.py:1037
A time-indexed array that can only handle equally spaced times.
Definition: numerics.py:689
This subclass of HWRFTask represents a WPS Task.
Definition: wps.py:90
increment
Simulation boundary input interval.
Definition: wps.py:144
Sorts a pre-determined list of objects, placing unknown items at a specified location.
Definition: numerics.py:21
def __init__(self, args, kwargs)
Creates a new Geogrid.
Definition: wps.py:541
This is a HWRF task that extracts the meteorological fields from GRIB formatted files and write the f...
Definition: wps.py:637
def __init__(self, args, kwargs)
Creates a new Ungrib.
Definition: wps.py:642
This is a HWRF task that pre processes the geogrid to define the model domains and interpolates stati...
Definition: wps.py:536
def undeliver_products
Deletes all delivered products and marks them as unavailable.
Definition: wps.py:381
def products
Iterates over all products.
Definition: wps.py:345
def domains(self)
Iterates over the domains in this WPSTask.
Definition: wps.py:302
def run(self)
Copies inputs, runs metgrid, delivers outputs.
Definition: wps.py:1061
Provides the location of a file in an archive, on disk or on a remote server via sftp or ftp...
Definition: input.py:109
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
Definition: run.py:242
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
Definition: datastore.py:1084
def __init__(self, dstore, conf, section, sim, domains, taskname=None, wpsdir=None, starttime=None, increment=None, endtime=None, parent_atime=None, geogrid_from=None, kwargs)
Create a new WPSTask.
Definition: wps.py:99
def times(self)
Iterates over all output times.
Definition: wps.py:289
def check_geogrid(self)
Checks to see if the geogrid MOAD output file is present and non-empty in the current working directo...
Definition: wps.py:227
def geodat(self, dom)
Returns the FileProduct for the geogrid data for the specified nesting ratio.
Definition: wps.py:570
def mpi(arg, kwargs)
Returns an MPIRank object that represents the specified MPI executable.
Definition: run.py:465
def taskvars(self)
The dict of object-local values used for string substitution.
Definition: hwrftask.py:243
def realtime(self)
Is this job a real-time forecast job?
Definition: hwrftask.py:180
inputs
the hwrf.input.DataCatalog to use for obtaining input data
Definition: wps.py:665
in_atime
Parent model analysis time.
Definition: wps.py:676
def make_products(self)
This subroutine should be implemented in subclasses.
Definition: wps.py:335
def link_fix
Links all fix files for ungrib to the current working directory.
Definition: wps.py:305
def one_time(self)
If True, we are pretending that hour 0 is valid for all times.
Definition: wps.py:735
def make_products(self)
Creates the FileProduct objects for this Geogrid.
Definition: wps.py:547
def make_symlink
Creates a symbolic link "target" that points to "source".
Definition: fileop.py:677