HWRF  trunk@4391
nhc_products.py
1 """!Generates NHC-specific products and validates wrfdiag files.
2 
3 This module implements NHCProducts, a subclass of HWRFTask that
4 runs the hwrf_nhc_products fortran program and delivers its outputs.
5 That program generates rainfall and wind swaths, various text files,
6 and other products."""
7 
8 ##@var __all__
9 # Symbols exported by "from hwrf.nhc_products import *"
10 __all__=['NHCProducts']
11 
12 import math, os, re, glob
16 
17 from math import pi,sqrt
18 from hwrf.numerics import to_fraction, split_fraction
19 from hwrf.constants import Rearth
20 from hwrf.namelist import to_fortnml
21 from produtil.fileop import make_symlink, deliver_file
22 from produtil.run import checkrun, exe, openmp, bigexe, runsync
23 from produtil.cd import TempDir
24 from produtil.datastore import COMPLETED, RUNNING, UpstreamFile, FileProduct
25 
27  """!This is a wrapper around the hwrf_nhc_products Fortran program."""
28  def __init__(self,dstore,conf,section,wrftask,track,domains,
29  vitals,stream='auxhist1',fcstlen=126,**kwargs):
30  """!NHCProducts constructor.
31  @param dstore the produtil.datastore.Datastore
32  @param conf the HWRFConfig object
33  @param section the config section to use
34  @param wrftask the ExternalWRFTask, WRFAtmos or WRFCoupledPOM, or
35  similar task, that can provide wrfdiag files and other WRF
36  outputs.
37  @param track the final tracker output Product
38  @param domains the list of WRFDomains whose wrfdiag files should
39  be processed
40  @param vitals the tcvitals
41  @param stream the WRF output stream for wrfdiag files
42  @param fcstlen forecast length in hours
43  @param kwargs Other keyword arguments are passed to the superclass constructor."""
44  if 'outdir' not in kwargs: kwargs['outdir']=conf.getdir('com')
45  super(NHCProducts,self).__init__(dstore,conf,section,**kwargs)
46  self._wrftask=wrftask
47  self._track=track # final track product
48  assert(track is not None)
49  self._domains=domains
50  self._stream=stream
51  self._vitals=vitals
52  simend=wrftask.wrf().simend()
53  self._wrfdiag=[ x for x in wrftask.products(stream=stream,
54  time=simend) ]
55  self._products=self._make_products()
56  self._name_for_ext=dict()
57  self.__fcstlen=fcstlen
58  def _make_products(self):
59  """!Generates FileProduct objects for all outputs."""
60  pre=os.path.join(self.getdir('com'),
61  self.confstrinterp('{out_prefix}.').lower())
62  PRE=os.path.join(self.getdir('com'),
63  self.confstrinterp('{out_prefix}.').upper())
64  deliverme=dict( wind10m='wind10m.ascii', rainfall='rainfall.ascii',
65  wind10hrly='wind10hrly.ascii')
66  DELIVERME=dict( stats='stats.tpc', htcf='hwrf_d03.htcf', afos='afos')
67  with self.dstore.transaction() as t:
68  prods=dict()
69  for pname,psub in deliverme.iteritems():
70  prod=FileProduct(
71  self.dstore,prodname=pname,category=self.taskname)
72  prod.location=pre+psub
73  prod['locbase']=os.path.basename(pre+psub)
74  prods[pname]=prod
75  for pname,psub in DELIVERME.iteritems():
76  prod=FileProduct(
77  self.dstore,prodname=pname,category=self.taskname)
78  prod.location=PRE+psub
79  prod['locbase']=os.path.basename(PRE+psub)
80  prods[pname]=prod
81  # for name,prod in prods.iteritems():
82  # prod['minsize']=0
83  # prod['minage']=-120
84  for domain in self._domains:
85  diagname='wrfdiag_d%02d'%(domain.get_grid_id(),)
86  prod = FileProduct(dstore=self.dstore, prodname=diagname,
87  category=self.taskname)
88  prod.location = pre+diagname
89  prod['locbase']=os.path.basename(pre+diagname)
90  prods[diagname] = prod
91  return prods
92  def canrun(self,silent=True):
93  """!Determines if the hwrf_nhc_products program can be run yet.
94  @param silent If silent=True, does not log anything.
95  @returns True if the program can run, False otherwise"""
96  with self.dstore.transaction():
97  # Check for track file:
98  if not self._track.available:
99  if not silent:
100  self.log().info('Cannot run: track not yet available. '
101  'Location: %s available: %d'% (
102  str(self._track.location),
103  int(self._track.available)))
104  return False
105  # Check for wrfdiag files. Run the check() method on each
106  # if it is not already available just in case it is an
107  # upstream file.
108  for w in self._wrfdiag:
109  if not w.available:
110  w.check()
111  if not w.available:
112  if not silent:
113  self.log().info('Cannot run: wrfdiag file %s '
114  'not available'%(str(w),))
115  return False
116  return True
117  def get_res_cutoff(self,wrf,fudge_factor=0.8):
118  """!Calculates resolution cutoff information.
119 
120  Calculates the outermost radius from the domain center at
121  which the storm center can be, while still considered to be
122  resolved by that domain. This is used to detect failures of
123  the nest motion, and report which nests actually contain the
124  storm. Iterates over radii for each nest, yielding a radius
125  in km.
126  @param wrf the hwrf.wrf.WRFSimulation being run
127  @param fudge_factor fudge factor to reduce resolution to
128  politically correct values"""
129  first=True
130  for domain in wrf:
131  if first:
132  first=False
133  else:
134  sn=domain.nl.nl_get('domains','e_sn')
135  assert(domain.dy is not None)
136  assert(Rearth is not None)
137  assert(sn is not None)
138  assert(fudge_factor is not None)
139  yield domain.dy*Rearth*pi/180.*sn*sqrt(2.)*fudge_factor
140  def nesting_level(self,moad,nest):
141  """!Determines the nesting level
142  Determines the nesting level of the specified nest relative
143  to the given moad. If nest=moad, the result is 0, if nest is
144  the direct child of moad, then 1, if it is the grandchild,
145  then 2, and so on.
146  @param moad the outermost domain
147  @param nest the nest of interest
148  @returns the nesting level, an integer"""
149  level=0
150  moadid=moad.get_grid_id()
151  dom=nest
152  while level<100 and dom.get_grid_id()!=moadid: # cap at 100
153  # just in case
154  level+=1
155  dom=dom.parent
156  return level
157  def write_namelist(self,f,wrf,moad,inner):
158  """!Writes the products.nml namelist file.
159 
160  This is an internal implementation function; do not call it
161  directly. Writes the products.nml namelist to file object f.
162  @param f the opened file object
163  @param wrf the hwrf.wrf.WRFSimulation
164  @param moad the outermost domain, an hwrf.wrf.WRFDomain
165  @param inner the innermost domain, an hwrf.wrf.WRFDomain"""
166  wrftask=self._wrftask
167  g=moad.nl.nl_get
168  basin1=self.confstr('basin1',section='config').upper()
169  stnum=self.confint('stnum',section='config')
170  assert(isinstance(wrftask.location,basestring))
171 
172  # Multistorm - jtf,st
173  # This will work for both multistorm and non-multistorm.
174  # For basin scale we are now calling wrftask.products(...) instead
175  # of hifreq=inner.hifreq_file(). This allows us to override the
176  # ExternalWRFTask _as_products method in the ForecastWatcher class and
177  # return the correct hifreq name and path.
178  hifreqs=[x for x in wrftask.products(
179  stream='hifreq',domains=[inner])]
180 
181  # hifreqs is a list of 22 of the same (not sure why) UpstreamFile objects.
182  # [UpstreamFile(Datastore,'hifreq_d03.htcf','runwrf')]
183  # The location has the correct path and name of the htcf file.
184  # hifreqs[0].location '..../00L/runwrf/hifreq_d05.htcf'
185  hifreq=os.path.basename(hifreqs[0].location)
186 
187 
188  assert(isinstance(hifreq,basestring))
189 
190  wrftaskloc=wrftask.location
191 
192  # Construct a dict of replacement strings to substitute into
193  # the namelist:
194  dt=to_fraction(moad.dt)
195  (dti,dtn,dtd)=split_fraction(dt) # integer part, numerator,
196  # denominator
197  repl={ 'dx':g('domains','dx'),
198  'dy':g('domains','dy'),
199  'time_step':dti,
200  'time_step_fract_num':dtn,
201  'time_step_fract_den':dtd,
202  'ide':g('domains','e_we'),
203  'jde':g('domains','e_sn'),
204  'YMDH':self.confint('YMDH',section='config'),
205  'inhifreq':os.path.join(str(wrftaskloc),hifreq),
206  'inatcf':self._track.location,
207  'domlat':self.conffloat('domlat',section='config'),
208  'domlon':self.conffloat('domlon',section='config'),
209  'STORM':str(self.storminfo.stormname).upper(),
210  'ATCFID':str(self.storminfo.stormid3).upper(),
211  'TierI_model':to_fortnml(self.confstr('TierI_model','HWRF')),
212  'TierI_submodel':to_fortnml(self.confstr('TierI_submodel',
213  'PARA')),
214  'TierI_realtime':to_fortnml(self.confbool('TierI_realtime',
215  True)),
216  'swathres':to_fortnml(self.conffloat('swathres',0.05)),
217  'swathpad':to_fortnml(self.conffloat('swathpad',0.3)),
218  'grads_byteswap':to_fortnml(self.confbool('grads_byteswap',
219  True)),
220  'nestlev':to_fortnml(self.nesting_level(moad,inner)),
221  'rescut':to_fortnml([float(x) for x in self.get_res_cutoff(
222  wrf)])
223  }
224  # Guess the forecast center from the basin:
225  if(basin1=='L' or basin1=='E' or basin1=='C' or basin1=='Q'):
226  repl['centername']='"NHC"'
227  else:
228  repl['centername']='"JTWC"'
229 
230  repl['fcst_len']=str(int(self.__fcstlen))
231 
232  # Now generate the actual namelist:
233  f.write('''
234 &nhc_products
235  intcvitals='tmpvit'
236  inatcf='{inatcf}'
237  inhifreq='{inhifreq}'
238  inwrfdiag='wrfdiag_d<DOMAIN>'
239  outpre='{STORM}{ATCFID}.{YMDH}.'
240  mdstatus='MDstatus'
241  resolution_cutoffs = {rescut}
242  want_ymdh={YMDH}
243  want_stid={ATCFID}
244  want_centername={centername}
245  coupler_dt=540.
246  fcst_len={fcst_len}
247  ide_moad={ide}
248  jde_moad={jde}
249  dlmd_moad={dx}
250  dphd_moad={dy}
251  clat={domlat}
252  clon={domlon}
253  nesting_level={nestlev}
254  grads_byteswap={grads_byteswap}
255  time_step={time_step}
256  time_step_fract_num={time_step_fract_num}
257  time_step_fract_den={time_step_fract_den}
258  model={TierI_model}
259  submodel={TierI_submodel}
260  realtime={TierI_realtime}
261  swath_latres=0.05
262  swath_lonres=0.05
263  swath_latpad=0.3
264  swath_lonpad=0.3
265  out4wave_out='binary_d<DOMAIN>'
266 /
267 '''.format(**repl))
268  def product(self,name):
269  """!Convenience function that returns the product with the
270  given name, or raises KeyError if none is found."""
271  return self._products[name]
272  def wrfdiag_products(self,what=None):
273  """!Iterates over wrfdiag file products
274  @param what ignored"""
275  for name,prod in self._products.iteritems():
276  assert(isinstance(name,basestring))
277  part=name[0:7]
278  if part=='wrfdiag':
279  yield prod
280  def products(self,what=None):
281  """!Returns Product objects describing files produced by this
282  Task.
283  @param what if specified, the name of the product of interest.
284  Otherwise, all products are iterated over"""
285  if what is not None:
286  if what in self._products:
287  yield self._products[what]
288  else:
289  for product in self._products.itervalues():
290  yield product
291 
292  def run(self,deliver_wrfdiag=False):
293  """!Copies inputs, runs the hwrf_nhc_input, and delivers results.
294  @param deliver_wrfdiag if True, wrfdiag files are also delivered"""
295  self.state=RUNNING
296  wrf=self._wrftask.wrf()
297  moad=wrf.get(self._domains[0])
298  inner=wrf.get(self._domains[-1])
299  logger=self.log()
300  wd=self.workdir
301  dir=os.path.dirname(wd)
302  prefix=os.path.basename(wd)
303  with TempDir(prefix=prefix,dir=dir,logger=logger):
304  runme=self.getexe('hwrf_nhc_products')
305  # Write the namelist:
306  with open('products.nml','wt') as f:
307  self.write_namelist(f,wrf,moad,inner)
308  # Write the tcvitals:
309  with open('tmpvit','wt') as f:
310  if(isinstance(self._vitals,hwrf.storminfo.StormInfo)):
311  f.write("%s\n"%(self._vitals.as_tcvitals(),))
312  else:
313  for vital in self._vitals:
314  f.write("%s\n"%(self._vitals.as_tcvitals(),))
315 
316  # Link all wrfdiag files:
317  for domain in self._domains:
318  # get the last wrfdiag time
319  (start,end,interval)=domain.get_output_range(self._stream)
320  # get the wrfdiag file
321  orig=[x for x in self._wrftask.products(
322  stream=self._stream,domains=[domain],time=end)]
323  # get the path of the last wrfdiag file in that list
324  orig=orig[-1].location
325  # local filename needed by program
326  here='wrfdiag_d%02d'%(domain.get_grid_id(),)
327  # make the symlink
328  make_symlink(orig,here,force=True,logger=logger)
329 
330  # Link to the coupling status file:
331  make_symlink(os.path.join(self._wrftask.location,'MDstatus'),
332  'MDstatus')
333  if self.confbool('sync_frequently',True):
334  runsync()
335  checkrun(openmp(exe(runme),threads=self.confint(
336  'threads',int(os.environ.get('NHC_PRODUCTS_NTHREADS',
337  '1')))),logger=logger)
338  self.deliver_outlist()
339  if deliver_wrfdiag:
340  self.deliver_wrfdiag()
341  self.state=COMPLETED
342  def rewrite_swath_ctl(self,ctlfile):
343  """!Modifies the swath.ctl file to point to a lower-case
344  swath.dat filename.
345  @param ctlfile the file to modify"""
346  newfile='%s.lowerdat'%(ctlfile,)
347  with open(ctlfile,'rt') as fi:
348  with open(newfile,'wt') as fo:
349  for line in fi:
350  m=re.match('^(.*DSET +\^)(.*)$',line)
351  if m: line="%s%s\n"%(m.group(1),m.group(2).lower())
352  fo.write(line)
353  return newfile
354  def _deliver_it(self,fromloc,toloc,keep,logger):
355  """!Helper function to deliver data.
356 
357  This is an internal implementation function used by
358  deliver_outlist. Do not call this directly.
359  @param fromloc source file location
360  @param toloc target file location
361  @param keep if True, keep the origin file
362  @param logger the logging.Logger to use"""
363  assert(fromloc is not None)
364  assert(toloc is not None)
365  assert(isinstance(fromloc,basestring))
366  assert(isinstance(toloc,basestring))
367  assert(keep is not None)
368  assert(logger is not None)
369  keep=bool(keep)
370  tobase=os.path.basename(toloc)
371  for (k,p) in self._products.iteritems():
372  locbase=p['locbase']
373  if tobase==locbase:
374  logger.info('%s is product %s (at %s), deliver that...'
375  %(tobase,p.did,locbase))
376  p.deliver(frominfo=fromloc,keep=keep,logger=logger)
377  return
378  else:
379  logger.info('%s is not product %s (at %s)'
380  %(tobase,p.did,locbase))
381  logger.info('%s has no Product, deliver_file instead'%(toloc,))
382  deliver_file(fromloc,toloc,keep=keep,logger=logger)
383 
384  def deliver_outlist(self):
385  """!Reads the "outlist" output file from hwrf_nhc_products and
386  delivers the listed files to the com directory."""
387  logger=self.log()
388  outfiles=list()
389  od=self.outdir
390  with open('outlist','rt') as outlist:
391  for outfile in outlist:
392  outfile=outfile.rstrip() # remove end-of-line character
393  outfiles.append(outfile)
394  for outfile in outfiles:
395  bn=os.path.basename(outfile)
396  if(re.search('\.swath.ctl',bn)):
397  # Change swath.ctl file to lower-case, and change
398  # the swath.dat filename inside to lower-case:
399  newctl=self.rewrite_swath_ctl(outfile)
400  with open(newctl,'rt') as f:
401  for line in f:
402  line.rstrip()
403  logger.info('NEWCTL: '+repr(line))
404  self._deliver_it(newctl,os.path.join(od,outfile.lower()),
405  keep=False,logger=logger)
406  elif(re.search('\.(afos|stats.tpc|htcf|resolution|htcfstats)$',
407  bn)):
408  # Deliver these twice: once in original (upper)
409  # case, and once in lower-case:
410  assert(outfile.find('swath')<0)
411  logger.info('%s: deliver twice: as upper- and lower-case'
412  %(outfile,))
413  self._deliver_it(outfile,os.path.join(od,outfile.lower()),
414  keep=True,logger=logger)
415  self._deliver_it(outfile,os.path.join(od,outfile),
416  keep=False,logger=logger)
417  elif(re.search('^a.*\.dat$',bn) and re.search('swath',bn)<0):
418  # Deliver these files in original case
419  assert(outfile.find('swath')<0)
420  self._deliver_it(outfile,os.path.join(od,outfile),
421  keep=False,logger=logger)
422  logger.info('%s: deliver as upper-case'%(outfile,))
423  else:
424  # Deliver remaining files in lower-case:
425  logger.info('%s: deliver with original case'%(bn,))
426  self._deliver_it(outfile,os.path.join(od,outfile.lower()),
427  keep=False,logger=logger)
428  for (name,prod) in self._products.iteritems():
429  loc=prod.location
430  if loc and os.path.basename(loc)==os.path.basename(outfile):
431  prod.check(logger=logger)
432  # Note: check instead of deliver because this
433  # is an UpstreamFile object.
434  nprod=0
435  for prod in self._products.iteritems():
436  if isinstance(prod,produtil.datastore.UpstreamFile):
437  prod.check(minage=-100)
438  nprod+=1
439  if not prod.available:
440  logger.warning('%s: not available at %s'%(
441  prod.did,prod.location))
442  else:
443  logger.info('%s: available at %s'%(
444  prod.did,prod.location))
445  logger.info('Checked %d UpstreamFile products'%nprod)
446  def deliver_wrfdiag(self):
447  """!Delivers wrfdiag files to their destination."""
448  ncks=self.getexe('ncks','')
449  logger=self.log()
450  od=self.outdir
451  if not ncks:
452  ncks=produtil.fileop.find_exe('ncks',raise_missing=False)
453  if ncks:
454  def copy(src,tgt,junk):
455  produtil.fileop.remove_file(tgt,logger=logger)
456  checkrun(bigexe(ncks)['-4','-L','6',src,tgt]<'/dev/null',
457  logger=logger)
458  else:
459  copy=None
460  # Copy wrfdiag files to com, converting to compressed NetCDF 4:
461  for prod in self.wrfdiag_products():
462  dest=os.path.join(od,self.confstrinterp(
463  '{out_prefix}.{prodname}',prodname=prod.prodname))
464  src=os.path.join(self._wrftask.location,prod.prodname)
465  logger.info("%s: deliver to %s"%(prod.did,prod.location))
466  prod.deliver(frominfo=src,location=dest,copier=copy)
467  # for filename in glob.glob('binary_d0?'):
468  # dest=self.confstrinterp('{com}/{out_prefix}.{prodname}',
469  # prodname=filename)
470  # logger.info("%s: deliver to %s"%(filename,dest))
471  # produtil.fileop.deliver_file(filename,dest,keep=False,
472  # logger=logger)
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
Definition: hwrftask.py:319
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
Constants used throughout the hwrf package.
Definition: constants.py:1
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
def _make_products(self)
Generates FileProduct objects for all outputs.
Definition: nhc_products.py:58
A subclass of Product that represents file delivery.
Definition: datastore.py:856
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
def remove_file
Deletes the specified file.
Definition: fileop.py:251
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
def nesting_level(self, moad, nest)
Determines the nesting level Determines the nesting level of the specified nest relative to the given...
def confbool
Alias for self.conf.getbool for section self.section.
Definition: hwrftask.py:287
def wrfdiag_products
Iterates over wrfdiag file products.
Defines StormInfo and related functions for interacting with vitals ATCF data.
Definition: storminfo.py:1
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
def get_res_cutoff
Calculates resolution cutoff information.
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
Definition: hwrftask.py:396
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
Time manipulation and other numerical routines.
Definition: numerics.py:1
This class is intended to be used with the Python "with TempDir() as t" syntax.
Definition: cd.py:38
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def confint
Alias for self.conf.getint for section self.section.
Definition: hwrftask.py:248
def write_namelist(self, f, wrf, moad, inner)
Writes the products.nml namelist file.
def products
Returns Product objects describing files produced by this Task.
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
Definition: namelist.py:1
def conffloat
Alias for self.conf.getfloat for section self.section.
Definition: hwrftask.py:274
def log
Obtain a logging domain.
Definition: hwrftask.py:425
def __init__(self, dstore, conf, section, wrftask, track, domains, vitals, stream='auxhist1', fcstlen=126, kwargs)
NHCProducts constructor.
Definition: nhc_products.py:29
def rewrite_swath_ctl(self, ctlfile)
Modifies the swath.ctl file to point to a lower-case swath.dat filename.
This is a wrapper around the hwrf_nhc_products Fortran program.
Definition: nhc_products.py:26
def product(self, name)
Convenience function that returns the product with the given name, or raises KeyError if none is foun...
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
def _deliver_it(self, fromloc, toloc, keep, logger)
Helper function to deliver data.
def canrun
Determines if the hwrf_nhc_products program can be run yet.
Definition: nhc_products.py:92
def find_exe
Searches the $PATH or a specified iterable of directory names to find an executable file with the giv...
Definition: fileop.py:573
def deliver_wrfdiag(self)
Delivers wrfdiag files to their destination.
def deliver_outlist(self)
Reads the "outlist" output file from hwrf_nhc_products and delivers the listed files to the com direc...
def run
Copies inputs, runs the hwrf_nhc_input, and delivers results.
Storm vitals information from ATCF, B-deck, tcvitals or message files.
Definition: storminfo.py:411
Represents a Product created by an external workflow.
Definition: datastore.py:915