1 """!Generates NHC-specific products and validates wrfdiag files.
3 This module implements NHCProducts, a subclass of HWRFTask that
4 runs the hwrf_nhc_products fortran program and delivers its outputs.
5 That program generates rainfall and wind swaths, various text files,
10 __all__=[
'NHCProducts']
12 import math, os, re, glob
17 from math
import pi,sqrt
22 from produtil.run import checkrun, exe, openmp, bigexe, runsync
27 """!This is a wrapper around the hwrf_nhc_products Fortran program."""
28 def __init__(self,dstore,conf,section,wrftask,track,domains,
29 vitals,stream=
'auxhist1',fcstlen=126,**kwargs):
30 """!NHCProducts constructor.
31 @param dstore the produtil.datastore.Datastore
32 @param conf the HWRFConfig object
33 @param section the config section to use
34 @param wrftask the ExternalWRFTask, WRFAtmos or WRFCoupledPOM, or
35 similar task, that can provide wrfdiag files and other WRF
37 @param track the final tracker output Product
38 @param domains the list of WRFDomains whose wrfdiag files should
40 @param vitals the tcvitals
41 @param stream the WRF output stream for wrfdiag files
42 @param fcstlen forecast length in hours
43 @param kwargs Other keyword arguments are passed to the superclass constructor."""
44 if 'outdir' not in kwargs: kwargs[
'outdir']=conf.getdir(
'com')
45 super(NHCProducts,self).
__init__(dstore,conf,section,**kwargs)
48 assert(track
is not None)
52 simend=wrftask.wrf().simend()
53 self.
_wrfdiag=[ x
for x
in wrftask.products(stream=stream,
58 def _make_products(self):
59 """!Generates FileProduct objects for all outputs."""
60 pre=os.path.join(self.
getdir(
'com'),
62 PRE=os.path.join(self.
getdir(
'com'),
64 deliverme=dict( wind10m=
'wind10m.ascii', rainfall=
'rainfall.ascii',
65 wind10hrly=
'wind10hrly.ascii')
66 DELIVERME=dict( stats=
'stats.tpc', htcf=
'hwrf_d03.htcf', afos=
'afos')
67 with self.dstore.transaction()
as t:
69 for pname,psub
in deliverme.iteritems():
72 prod.location=pre+psub
73 prod[
'locbase']=os.path.basename(pre+psub)
75 for pname,psub
in DELIVERME.iteritems():
78 prod.location=PRE+psub
79 prod[
'locbase']=os.path.basename(PRE+psub)
85 diagname=
'wrfdiag_d%02d'%(domain.get_grid_id(),)
88 prod.location = pre+diagname
89 prod[
'locbase']=os.path.basename(pre+diagname)
90 prods[diagname] = prod
93 """!Determines if the hwrf_nhc_products program can be run yet.
94 @param silent If silent=True, does not log anything.
95 @returns True if the program can run, False otherwise"""
96 with self.dstore.transaction():
98 if not self._track.available:
100 self.
log().info(
'Cannot run: track not yet available. '
101 'Location: %s available: %d'% (
102 str(self._track.location),
103 int(self._track.available)))
113 self.
log().info(
'Cannot run: wrfdiag file %s '
114 'not available'%(str(w),))
118 """!Calculates resolution cutoff information.
120 Calculates the outermost radius from the domain center at
121 which the storm center can be, while still considered to be
122 resolved by that domain. This is used to detect failures of
123 the nest motion, and report which nests actually contain the
124 storm. Iterates over radii for each nest, yielding a radius
126 @param wrf the hwrf.wrf.WRFSimulation being run
127 @param fudge_factor fudge factor to reduce resolution to
128 politically correct values"""
134 sn=domain.nl.nl_get(
'domains',
'e_sn')
135 assert(domain.dy
is not None)
136 assert(Rearth
is not None)
137 assert(sn
is not None)
138 assert(fudge_factor
is not None)
139 yield domain.dy*Rearth*pi/180.*sn*sqrt(2.)*fudge_factor
141 """!Determines the nesting level
142 Determines the nesting level of the specified nest relative
143 to the given moad. If nest=moad, the result is 0, if nest is
144 the direct child of moad, then 1, if it is the grandchild,
146 @param moad the outermost domain
147 @param nest the nest of interest
148 @returns the nesting level, an integer"""
150 moadid=moad.get_grid_id()
152 while level<100
and dom.get_grid_id()!=moadid:
158 """!Writes the products.nml namelist file.
160 This is an internal implementation function; do not call it
161 directly. Writes the products.nml namelist to file object f.
162 @param f the opened file object
163 @param wrf the hwrf.wrf.WRFSimulation
164 @param moad the outermost domain, an hwrf.wrf.WRFDomain
165 @param inner the innermost domain, an hwrf.wrf.WRFDomain"""
168 basin1=self.
confstr(
'basin1',section=
'config').upper()
169 stnum=self.
confint(
'stnum',section=
'config')
170 assert(isinstance(wrftask.location,basestring))
178 hifreqs=[x
for x
in wrftask.products(
179 stream=
'hifreq',domains=[inner])]
185 hifreq=os.path.basename(hifreqs[0].location)
188 assert(isinstance(hifreq,basestring))
190 wrftaskloc=wrftask.location
194 dt=to_fraction(moad.dt)
195 (dti,dtn,dtd)=split_fraction(dt)
197 repl={
'dx':g(
'domains',
'dx'),
198 'dy':g(
'domains',
'dy'),
200 'time_step_fract_num':dtn,
201 'time_step_fract_den':dtd,
202 'ide':g(
'domains',
'e_we'),
203 'jde':g(
'domains',
'e_sn'),
204 'YMDH':self.
confint(
'YMDH',section=
'config'),
205 'inhifreq':os.path.join(str(wrftaskloc),hifreq),
206 'inatcf':self._track.location,
207 'domlat':self.
conffloat(
'domlat',section=
'config'),
208 'domlon':self.
conffloat(
'domlon',section=
'config'),
209 'STORM':str(self.storminfo.stormname).upper(),
210 'ATCFID':str(self.storminfo.stormid3).upper(),
211 'TierI_model':to_fortnml(self.
confstr(
'TierI_model',
'HWRF')),
212 'TierI_submodel':to_fortnml(self.
confstr(
'TierI_submodel',
214 'TierI_realtime':to_fortnml(self.
confbool(
'TierI_realtime',
216 'swathres':to_fortnml(self.
conffloat(
'swathres',0.05)),
217 'swathpad':to_fortnml(self.
conffloat(
'swathpad',0.3)),
218 'grads_byteswap':to_fortnml(self.
confbool(
'grads_byteswap',
225 if(basin1==
'L' or basin1==
'E' or basin1==
'C' or basin1==
'Q'):
226 repl[
'centername']=
'"NHC"'
228 repl[
'centername']=
'"JTWC"'
230 repl[
'fcst_len']=str(int(self.
__fcstlen))
237 inhifreq='{inhifreq}'
238 inwrfdiag='wrfdiag_d<DOMAIN>'
239 outpre='{STORM}{ATCFID}.{YMDH}.'
241 resolution_cutoffs = {rescut}
244 want_centername={centername}
253 nesting_level={nestlev}
254 grads_byteswap={grads_byteswap}
255 time_step={time_step}
256 time_step_fract_num={time_step_fract_num}
257 time_step_fract_den={time_step_fract_den}
259 submodel={TierI_submodel}
260 realtime={TierI_realtime}
265 out4wave_out='binary_d<DOMAIN>'
269 """!Convenience function that returns the product with the
270 given name, or raises KeyError if none is found."""
273 """!Iterates over wrfdiag file products
274 @param what ignored"""
275 for name,prod
in self._products.iteritems():
276 assert(isinstance(name,basestring))
281 """!Returns Product objects describing files produced by this
283 @param what if specified, the name of the product of interest.
284 Otherwise, all products are iterated over"""
289 for product
in self._products.itervalues():
292 def run(self,deliver_wrfdiag=False):
293 """!Copies inputs, runs the hwrf_nhc_input, and delivers results.
294 @param deliver_wrfdiag if True, wrfdiag files are also delivered"""
296 wrf=self._wrftask.wrf()
301 dir=os.path.dirname(wd)
302 prefix=os.path.basename(wd)
303 with
TempDir(prefix=prefix,dir=dir,logger=logger):
304 runme=self.
getexe(
'hwrf_nhc_products')
306 with open(
'products.nml',
'wt')
as f:
309 with open(
'tmpvit',
'wt')
as f:
311 f.write(
"%s\n"%(self._vitals.as_tcvitals(),))
314 f.write(
"%s\n"%(self._vitals.as_tcvitals(),))
319 (start,end,interval)=domain.get_output_range(self.
_stream)
321 orig=[x
for x
in self._wrftask.products(
322 stream=self.
_stream,domains=[domain],time=end)]
324 orig=orig[-1].location
326 here=
'wrfdiag_d%02d'%(domain.get_grid_id(),)
328 make_symlink(orig,here,force=
True,logger=logger)
331 make_symlink(os.path.join(self._wrftask.location,
'MDstatus'),
333 if self.
confbool(
'sync_frequently',
True):
335 checkrun(openmp(exe(runme),threads=self.
confint(
336 'threads',int(os.environ.get(
'NHC_PRODUCTS_NTHREADS',
337 '1')))),logger=logger)
343 """!Modifies the swath.ctl file to point to a lower-case
345 @param ctlfile the file to modify"""
346 newfile=
'%s.lowerdat'%(ctlfile,)
347 with open(ctlfile,
'rt')
as fi:
348 with open(newfile,
'wt')
as fo:
350 m=re.match(
'^(.*DSET +\^)(.*)$',line)
351 if m: line=
"%s%s\n"%(m.group(1),m.group(2).lower())
354 def _deliver_it(self,fromloc,toloc,keep,logger):
355 """!Helper function to deliver data.
357 This is an internal implementation function used by
358 deliver_outlist. Do not call this directly.
359 @param fromloc source file location
360 @param toloc target file location
361 @param keep if True, keep the origin file
362 @param logger the logging.Logger to use"""
363 assert(fromloc
is not None)
364 assert(toloc
is not None)
365 assert(isinstance(fromloc,basestring))
366 assert(isinstance(toloc,basestring))
367 assert(keep
is not None)
368 assert(logger
is not None)
370 tobase=os.path.basename(toloc)
371 for (k,p)
in self._products.iteritems():
374 logger.info(
'%s is product %s (at %s), deliver that...'
375 %(tobase,p.did,locbase))
376 p.deliver(frominfo=fromloc,keep=keep,logger=logger)
379 logger.info(
'%s is not product %s (at %s)'
380 %(tobase,p.did,locbase))
381 logger.info(
'%s has no Product, deliver_file instead'%(toloc,))
382 deliver_file(fromloc,toloc,keep=keep,logger=logger)
385 """!Reads the "outlist" output file from hwrf_nhc_products and
386 delivers the listed files to the com directory."""
390 with open(
'outlist',
'rt')
as outlist:
391 for outfile
in outlist:
392 outfile=outfile.rstrip()
393 outfiles.append(outfile)
394 for outfile
in outfiles:
395 bn=os.path.basename(outfile)
396 if(re.search(
'\.swath.ctl',bn)):
400 with open(newctl,
'rt')
as f:
403 logger.info(
'NEWCTL: '+repr(line))
404 self.
_deliver_it(newctl,os.path.join(od,outfile.lower()),
405 keep=
False,logger=logger)
406 elif(re.search(
'\.(afos|stats.tpc|htcf|resolution|htcfstats)$',
410 assert(outfile.find(
'swath')<0)
411 logger.info(
'%s: deliver twice: as upper- and lower-case'
413 self.
_deliver_it(outfile,os.path.join(od,outfile.lower()),
414 keep=
True,logger=logger)
416 keep=
False,logger=logger)
417 elif(re.search(
'^a.*\.dat$',bn)
and re.search(
'swath',bn)<0):
419 assert(outfile.find(
'swath')<0)
421 keep=
False,logger=logger)
422 logger.info(
'%s: deliver as upper-case'%(outfile,))
425 logger.info(
'%s: deliver with original case'%(bn,))
426 self.
_deliver_it(outfile,os.path.join(od,outfile.lower()),
427 keep=
False,logger=logger)
428 for (name,prod)
in self._products.iteritems():
430 if loc
and os.path.basename(loc)==os.path.basename(outfile):
431 prod.check(logger=logger)
435 for prod
in self._products.iteritems():
437 prod.check(minage=-100)
439 if not prod.available:
440 logger.warning(
'%s: not available at %s'%(
441 prod.did,prod.location))
443 logger.info(
'%s: available at %s'%(
444 prod.did,prod.location))
445 logger.info(
'Checked %d UpstreamFile products'%nprod)
447 """!Delivers wrfdiag files to their destination."""
448 ncks=self.
getexe(
'ncks',
'')
454 def copy(src,tgt,junk):
456 checkrun(bigexe(ncks)[
'-4',
'-L',
'6',src,tgt]<
'/dev/null',
463 '{out_prefix}.{prodname}',prodname=prod.prodname))
464 src=os.path.join(self._wrftask.location,prod.prodname)
465 logger.info(
"%s: deliver to %s"%(prod.did,prod.location))
466 prod.deliver(frominfo=src,location=dest,copier=copy)
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Constants used throughout the hwrf package.
taskname
Read-only property: the name of this task.
def _make_products(self)
Generates FileProduct objects for all outputs.
A subclass of Product that represents file delivery.
The base class of tasks run by the HWRF system.
def remove_file
Deletes the specified file.
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
def nesting_level(self, moad, nest)
Determines the nesting level Determines the nesting level of the specified nest relative to the given...
def confbool
Alias for self.conf.getbool for section self.section.
def wrfdiag_products
Iterates over wrfdiag file products.
Defines StormInfo and related functions for interacting with vitals ATCF data.
Base class of tasks run by HWRF.
A shell-like syntax for running serial, MPI and OpenMP programs.
def get_res_cutoff
Calculates resolution cutoff information.
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
outdir
The directory in which this task should deliver its final output.
Stores products and tasks in an sqlite3 database file.
Time manipulation and other numerical routines.
This class is intended to be used with the Python "with TempDir() as t" syntax.
workdir
The directory in which this task should be run.
def confint
Alias for self.conf.getint for section self.section.
def write_namelist(self, f, wrf, moad, inner)
Writes the products.nml namelist file.
def products
Returns Product objects describing files produced by this Task.
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
def conffloat
Alias for self.conf.getfloat for section self.section.
def log
Obtain a logging domain.
def __init__(self, dstore, conf, section, wrftask, track, domains, vitals, stream='auxhist1', fcstlen=126, kwargs)
NHCProducts constructor.
def rewrite_swath_ctl(self, ctlfile)
Modifies the swath.ctl file to point to a lower-case swath.dat filename.
This is a wrapper around the hwrf_nhc_products Fortran program.
def product(self, name)
Convenience function that returns the product with the given name, or raises KeyError if none is foun...
def confstr
Alias for self.conf.getstr for section self.section.
def _deliver_it(self, fromloc, toloc, keep, logger)
Helper function to deliver data.
def canrun
Determines if the hwrf_nhc_products program can be run yet.
def find_exe
Searches the $PATH or a specified iterable of directory names to find an executable file with the giv...
def deliver_wrfdiag(self)
Delivers wrfdiag files to their destination.
def deliver_outlist(self)
Reads the "outlist" output file from hwrf_nhc_products and delivers the listed files to the com direc...
def run
Copies inputs, runs the hwrf_nhc_input, and delivers results.
Storm vitals information from ATCF, B-deck, tcvitals or message files.
Represents a Product created by an external workflow.