1 """This module handles WW3 related scripts for HWRF system."""
3 __all__ = [
'WW3Init',
'WRFWW3POM',
'WW3Post']
13 from produtil.run import runstr, checkrun, exe, bigexe, alias
14 from hwrf.numerics import to_datetime, to_datetime_rel, to_fraction, to_timedelta
18 'mod_def': (
'./mod_def.ww3',
'{com}/{out_prefix}.mod_def.ww3' ),
19 'wind': (
'./wind.ww3',
'{com}/{out_prefix}.wind.ww3' ),
20 'current': (
'./current.ww3',
'{com}/{out_prefix}.current.ww3' ),
21 'restart': (
'./restart.ww3',
'{com}/{out_prefix}.restart_init.ww3' ),
22 'ww3_shel': (
'./ww3_shel.inp',
'{com}/{out_prefix}.ww3_shel.inp' ) }
26 def __init__(self,dstore,conf,section,taskname=None,fcstlen=126,
27 outstep=21600, rststep=21600, **kwargs):
29 dstore - the produtil.datastore.Datastore to use
30 conf - the HWRFConfig to use
31 section - the section name for this task
32 taskname - the task name. Default: section
33 fcstlen - the forecast length in hours
34 outstep - the output step in seconds
35 rststep - the restart output step in seconds
36 Other keyword arguments are passed to the superclass constructor."""
37 super(WW3Init,self).
__init__(dstore,conf,section,taskname=taskname,**kwargs)
42 def _make_products(self):
43 """Creates FileProduct objects for all output files. The
44 outdir is the directory to which the pom package output its
48 with self.dstore.transaction():
49 for prodname,filepaths
in prodnames.iteritems():
50 (localpath,compath)=filepaths
53 prod.location=self.
timestr(compath,atime,atime)
54 prod[
'localpath'] = localpath
55 self.
_products[prodname]=( prod,localpath )
57 """Iterate over all products."""
58 for prodname,stuff
in self._products.iteritems():
59 (prod,localpath)=stuff
60 if name
is None or name==prod.prodname:
64 atime=to_datetime(self.conf.cycle)
65 etime=to_datetime_rel(3600*self.
fcstlen,atime)
66 interval=to_fraction(self.
confint(
'input_step',6*3600))
68 dataset=self.
confstr(
'gfs_dataset',
'gfs')
69 item=self.
confstr(
'gfs_item',
'gfs')
71 epsilon=to_timedelta(interval/10)
72 ende=to_datetime_rel(epsilon,etime)
75 yield dict(self.
taskvars,dataset=dataset,item=item,ftime=when,atime=atime)
76 when=to_datetime_rel(interval,when)
78 def gfsgrib2iter(self):
80 atime=to_datetime(self.conf.cycle)
81 etime=to_datetime_rel(self.
fcstlen*3600,atime)
82 interval=to_fraction(self.
confint(
'input_step',6*3600))
83 dataset=self.
confstr(
'gfs_dataset',
'gfs')
84 item=self.
confstr(
'gfs_item',
'gfs')
85 hd=self.
confstr(
'catalog',
'hwrfdata')
87 epsilon=to_timedelta(interval/10)
88 ende=to_datetime_rel(epsilon,etime)
91 maxwait=self.
confint(
'max_grib_wait',1800)
92 sleeptime=self.
confint(
'grib_sleep_time',20)
93 min_size=self.
confint(
'min_grib_size',1)
94 min_mtime_age=self.
confint(
'min_grib_age',30)
96 thefile=dc.locate(dataset=dataset,item=item,ftime=when,atime=atime,**self.
taskvars)
98 waited=wait_for_files(
99 [thefile],logger,maxwait=maxwait,sleeptime=sleeptime,
100 min_size=min_size,min_mtime_age=min_mtime_age)
102 msg=
'%s: did not exist or was too small after %d seconds'%(
104 self.
log().error(msg)
107 fhour=fhour+interval/3600
108 when=to_datetime_rel(interval,when)
110 def deliver_products(self):
112 for prodname,stuff
in self._products.iteritems():
113 (prod,localpath)=stuff
114 prod.deliver(frominfo=localpath,keep=
False,logger=logger)
117 """Runs the WW3 initialization"""
120 usegfswind=self.
confstr(
'usegfswind',
'yes')
121 if usegfswind ==
'yes':
123 elif usegfswind ==
'no':
127 logger.warning(
'Wrong usegfswind value: %s. Assume usegfswind=yes.'
128 'Set dummywind to False.'%(usegfswind,))
133 redirect=self.
confbool(
'redirect',
True)
137 make_symlink(s,t,force=
True,logger=logger)
138 deliver_file(self.
icstr(
'{grid_inp}'),
'ww3_grid.inp',keep=
True,logger=logger)
139 link(self.
icstr(
'{grid_bot}'),
'.')
140 link(self.
icstr(
'{grid_msk}'),
'.')
141 link(self.
icstr(
'{grid_obr}'),
'.')
142 link(self.
getexe(
'ww3_grid'),
'ww3_grid')
144 cmd=exe(
'./ww3_grid')
145 if redirect: cmd = cmd>=
'ww3_grid.log'
146 checkrun(cmd,logger=logger)
148 if usegfswind ==
'yes':
150 ncfile=
'gfs.uvgrd10m.nc'
152 cmd=alias(bigexe(self.
getexe(
'wgrib2',
'wgrib2')))
154 logger.info(
'Extracting wind at 10 m from %s'%(f))
156 for line
in runstr(cmd[f],logger=logger).splitlines(
True):
157 if re.search(
':[UV]GRD:10 m above ground:',line):
159 runme=cmd[f,
'-i',
'-append',
'-netcdf', ncfile] << subset
160 checkrun(runme, logger=logger)
166 produtil.log.jlogger.warning(
167 'ww3init: will use dummy wind because %s is missing '
168 'or empty.'%(ncfile,))
172 deliver_file(self.
icstr(
'{wind_inp}'),
'ww3_prep.inp',keep=
True,logger=logger)
173 link(self.
getexe(
'ww3_prep'),
'ww3_prep')
175 cmd=exe(
'./ww3_prep')
176 if redirect: cmd = cmd>=
'ww3_prep_wind.log'
177 checkrun(cmd,logger=logger)
180 deliver_file(self.
icstr(
'{prnc_inp_gfswind}'),
'ww3_prnc.inp',keep=
True,logger=logger)
181 link(self.
getexe(
'ww3_prnc'),
'ww3_prnc')
182 cmd=exe(
'./ww3_prnc')
183 if redirect: cmd = cmd>=
'ww3_prnc_wind.log'
184 checkrun(cmd,logger=logger)
188 deliver_file(self.
icstr(
'{curr_inp}'),
'ww3_prep.inp',keep=
True,logger=logger)
189 link(self.
getexe(
'ww3_prep'),
'ww3_prep')
191 cmd=exe(
'./ww3_prep')
192 if redirect: cmd = cmd>=
'ww3_prep_curr.log'
193 checkrun(cmd,logger=logger)
196 logger.error(
'Not implemented yet')
201 oldrst=self.
icstr(
'{oldcom}/{oldvit[stormid3lc]}.restart.f006.ww3')
205 produtil.log.jlogger.info(
'%s: warm start for wave'%(oldrst,))
207 produtil.log.jlogger.warning(
208 'restart.ww3: will generate dummy because %s is missing '
209 'or empty.'%(oldrst,))
210 except Exception
as ee:
211 produtil.log.jlogger.warning(
212 'restart.ww3: will generate dummy because %s is missing '
213 'or could not be copied; %s'%(oldrst,str(ee)),exc_info=
True)
216 logger.info(
'restart.ww3: generating dummy with ww3_strt')
218 deliver_file(self.
icstr(
'{strt_inp}'),
'ww3_strt.inp',keep=
True,logger=logger)
219 link(self.
getexe(
'ww3_strt'),
'ww3_strt')
220 cmd=exe(
'./ww3_strt')
221 if redirect: cmd = cmd>=
'ww3_strt.log'
222 checkrun(cmd,logger=logger)
228 shel_inp=self.
icstr(
'{shel_inp}')
229 atime=to_datetime(self.conf.cycle)
230 etime=to_datetime_rel(self.
fcstlen*3600,atime)
231 ci=self.conf.getfloat(
'config',
'cycling_interval',6)
232 retime=to_datetime_rel(ci*3600*1,atime)
234 invars.update(RUN_BEG=atime.strftime(
'%Y%m%d %H%M%S'),
235 RUN_END=etime.strftime(
'%Y%m%d %H%M%S'),
236 FLD_BEG=atime.strftime(
'%Y%m%d %H%M%S'),
237 FLD_END=etime.strftime(
'%Y%m%d %H%M%S'),
239 PNT_BEG=atime.strftime(
'%Y%m%d %H%M%S'),
240 PNT_END=etime.strftime(
'%Y%m%d %H%M%S'),
242 RST_BEG=atime.strftime(
'%Y%m%d %H%M%S'),
243 RST_END=retime.strftime(
'%Y%m%d %H%M%S'),
246 with open(shel_inp,
'rt')
as nf:
247 with open(
'ww3_shel.inp',
'wt')
as of:
248 of.write(ni.parse(nf,logger=logger,source=shel_inp,
249 raise_all=
True,atime=self.conf.cycle,**invars))
253 except Exception
as e:
254 logger.error(
'Unhandled exception in wave init: %s'
255 %(str(e),),exc_info=
True)
262 for lf
in [
'ww3_grid.log',
'ww3_prep_wind.log',
'ww3_prep_curr.log'
264 comloc=self.
icstr(
'{com}/{out_prefix}.{lf}.ww3',lf=lf)
265 if os.path.exists(lf):
266 deliver_file(lf,comloc,keep=
True,logger=logger)
270 """This is an internal implementation class that should never be
271 used directly. It instructs the hwrf.coupling.CoupledWRF to call
272 the WRFWW3POM.copy_ww3_inputs to check or link WW3 input data."""
274 """Creates a WW3Initer that will pass control to the given
275 WRFWW3POM object, stored as self.wcp."""
278 """Calls the WRFWW3POM.copy_ww3_inputs with just_check=True."""
279 return self.wcp.copy_ww3_inputs(just_check=
True)
281 """Calls the WRFWW3POM.copy_ww3_inputs passing just_check."""
282 return self.wcp.copy_ww3_inputs(bool(just_check))
286 def copy_ww3_inputs(ww3init,just_check=False):
289 for prod
in ww3init.products():
290 localname=prod[
'localpath']
293 if not avail
or not loc
or not localname:
295 localname=prod[
'localpath']
298 if not avail
or not loc
or not localname:
299 msg=
'WW3 product %s (available=%s location=%s localname=%s)'\
300 ' is not available or has an empty location'%(
301 prod.did,repr(prod.available), repr(prod.location),
310 deliver_file(loc,os.path.basename(localname),keep=
True,
313 logger.info(
'Copied %d WW3 inputs. Returning True.'%(n_copied))
316 def add_ww3_product(task,rstbeg,rstdt,rstend,stream='ww3rst',fmt='restart%03d.ww3'):
317 atime=task.conf.cycle
318 beg=to_datetime_rel(rstbeg,atime)
319 end=to_datetime_rel(rstend,atime)
320 dt=to_fraction(rstdt)
322 eend=to_datetime_rel(epsilon,end)
328 now=to_datetime_rel(dt,now)
329 task.add_coupled_stream(stream,times)
331 with task.dstore.transaction()
as t:
341 location=os.path.join(task.location,filename)
345 prodname=prodname,location=location)
346 prod[
'stream']=stream
347 prod[
'location']=location
349 prod[
'restarttime']=round(to_fraction(time-atime)/3600)
352 task.add_coupled_product(stream,time,prod)
355 task.log().info(
'Created %d output %s products from %s to %s'%(
356 itime, stream, beg.strftime(
'%Y%m%d%H%M'), end.strftime(
'%Y%m%d%H%M')))
360 """Runs a WRF-WW3 coupled simulation (no ocean). Most of the work
361 of this class is done by the superclasses. This class adds code
362 to copy the inputs needed by WW3 and the coupler. There are three
363 critical new config section values:
365 wm3c_ranks = number of coupler ranks. Default: 1
366 ww3_ranks = number of WW3 ranks. Default: 24
367 wrf_ranks = nubmer of WRF ranks. No default. This one is
369 def __init__(self,dstore,conf,section,wrf,keeprun=True,
370 wrfdiag_stream=
'auxhist1',ww3init=
None,**kwargs):
371 if not isinstance(ww3init,WW3Init):
373 'The ww3init argument to WRFCoupledWW3.__init__ must be a '
374 'WW3Init object. You passed a %s %s.'%
375 (type(ww3init).__name__,repr(ww3init)))
376 super(WRFCoupledWW3,self).__init__(dstore,conf,section,wrf,keeprun,
377 wrfdiag_stream,**kwargs)
380 self.
couple(
'coupler',
'hwrf_wm3c',
'wm3c_ranks',1)
381 self.
couple(
'ww3',
'hwrf_ocean_fcst',
'ww3_ranks',48,ww3initer)
382 self.
couple(
'wrf',
'wrf',
'wrf_ranks')
384 ci=self.conf.getfloat(
'config',
'cycling_interval',6)
385 rstdt=ww3init.rststep
388 add_ww3_product(self,rstdt,rstdt,ci*1*3600,
'ww3rst',
'restart%03d.ww3')
390 add_ww3_product(self,wrf.simend(),3600,wrf.simend(),
'ww3out',
'out_grd.ww3')
391 add_ww3_product(self,wrf.simend(),3600,wrf.simend(),
'ww3mdldef',
'mdl_def.ww3')
392 def remove_wave(self):
395 def copy_ww3_inputs(self,just_check=False):
396 return copy_ww3_inputs(self.
ww3init,just_check)
401 def __init__(self,dstore,conf,section,wrf,keeprun=True,
402 wrfdiag_stream=
'auxhist1',pominit=
None,ww3init=
None,
404 if not isinstance(ww3init,WW3Init):
406 'The ww3init argument to WRFCoupledWW3.__init__ must be a '
407 'WW3Init object. You passed a %s %s.'%
408 (type(ww3init).__name__,repr(ww3init)))
411 super(WRFWW3POM,self).__init__(dstore,conf,section,wrf,keeprun,wrfdiag_stream,pominit,**kwargs)
413 ci=self.conf.getfloat(
'config',
'cycling_interval',6)
414 rstdt=ww3init.rststep
417 add_ww3_product(self,rstdt,rstdt,ci*1*3600,
'ww3rst',
'restart%03d.ww3')
419 add_ww3_product(self,wrf.simend(),3600,wrf.simend(),
'ww3out',
'out_grd.ww3')
420 add_ww3_product(self,wrf.simend(),3600,wrf.simend(),
'ww3moddef',
'mod_def.ww3')
422 def remove_ocean(self):
425 def remove_wave(self):
431 def copy_ww3_inputs(self,just_check=False):
432 return copy_ww3_inputs(self.
ww3init,just_check)
436 """Run WW3 post-process."""
437 def __init__(self,ds,conf,section,poststep,ww3,**kwargs):
438 super(WW3Post,self).__init__(ds,conf,section,**kwargs)
444 def products(self,**kwargs):
446 redirect=self.
confbool(
'redirect',
True)
449 def make_products(self):
450 redirect=self.
confbool(
'redirect',
True)
457 def __copy_ounf(self,source,target,ignore):
461 checkrun(bigexe(ncks)[
'-4',
'-L',
'6',source,target]<
'/dev/null',
466 """Returns the path to ncks. Returns None if ncks cannot be
467 found. This function will only search for ncks once, and will
468 cache the result. Set self._ncks_path=False to force a
471 ncks=self.
getexe(
'ncks',
'')
474 assert(ncks
is None or
475 (isinstance(ncks,basestring)
and ncks!=
''))
480 """Run the WW3 post."""
482 redirect=self.
confbool(
'redirect',
True)
485 with
NamedDir(self.
workdir,keep=
True,logger=logger,rm_first=
True)
as d:
486 mdprod=[ p
for p
in self.ww3.products(stream=
'ww3moddef') ] [0]
487 ogprod=[ p
for p
in self.ww3.products(stream=
'ww3out') ] [0]
489 if not mdprod
or not mdprod.available
or not mdprod.location:
490 logger.error(
'%s: mod_def.ww3 not yet available from forecast'%(
493 if not ogprod
or not ogprod.available
or not ogprod.location:
494 logger.error(
'%s: mod_def.ww3 not yet available from forecast'%(
496 make_symlink(mdprod.location,
'mod_def.ww3')
497 make_symlink(ogprod.location,
'out_grd.ww3')
498 make_symlink(self.
getexe(
'ww3_ounf'),
'ww3_ounf')
504 cmd=exe(
'./ww3_ounf')
505 if redirect: cmd = cmd>=
'ww3_ounf.log'
506 checkrun(cmd,logger=logger)
509 ww3_ounf_out=self.conf.cycle.strftime(
'ww3.%Y.nc')
510 ww3_ounf_out_com=self.
icstr(
'{com}/{out_prefix}.ww3_ounf.nc')
512 self._ww3ounfprod.deliver(frominfo=ww3_ounf_out,
513 location=ww3_ounf_out_com,
516 except Exception
as e:
518 logger.error(
"WW3 post failed: %s"%(str(e),),exc_info=
True)
521 def make_ounf_inp(self,logger):
524 ounf_inp=self.
confstr(
'ounf_inp',
'')
525 if not ounf_inp: ounf_inp=self.
icstr(
'{PARMhwrf}/ww3_ounf.inp_tmpl')
526 atime=to_datetime(self.conf.cycle)
528 invars.update(FLD_BEG=atime.strftime(
'%Y%m%d %H%M%S'),
531 with open(ounf_inp,
'rt')
as nf:
532 with open(
'ww3_ounf.inp',
'wt')
as of:
533 of.write(ni.parse(nf,logger=logger,source=ounf_inp,
534 raise_all=
True,atime=self.conf.cycle,**invars))
Runs the POM initialization and POM-WRF coupled forecast.
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
def check_coupled_inputs(self, logger)
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
def __copy_ounf(self, source, target, ignore)
taskname
Read-only property: the name of this task.
def make_ounf_inp(self, logger)
A subclass of Product that represents file delivery.
The base class of tasks run by the HWRF system.
def remove_file
Deletes the specified file.
conf
This HWRFTask's hwrf.config.HWRFConfig object.
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
def products(self, name=None, kwargs)
def confbool
Alias for self.conf.getbool for section self.section.
section
The confsection in self.section for this HWRFTask (read-only)
Base class of tasks run by HWRF.
A shell-like syntax for running serial, MPI and OpenMP programs.
def link_coupled_inputs(self, just_check, logger)
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Stores products and tasks in an sqlite3 database file.
def to_datetime(d)
Converts the argument to a datetime.
This subclass of TempDir takes a directory name, instead of generating one automatically.
Time manipulation and other numerical routines.
def deliver_products(self)
workdir
The directory in which this task should be run.
def confint
Alias for self.conf.getint for section self.section.
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
def timestr(self, string, ftime, atime=None, section=None, kwargs)
Expands a string in the given conf section, including time vars.
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
def log
Obtain a logging domain.
Insert config file data into a Fortran namelist file.
def __init__(self, dstore, conf, section, taskname=None, fcstlen=126, outstep=21600, rststep=21600, kwargs)
def uncouple
Removes a component, or all components, from the coupling.
Exceptions raised by the hwrf package.
def confstr
Alias for self.conf.getstr for section self.section.
def find_exe
Searches the $PATH or a specified iterable of directory names to find an executable file with the giv...
def taskvars(self)
The dict of object-local values used for string substitution.
def realtime(self)
Is this job a real-time forecast job?
def icstr(self, string, section=None, kwargs)
Expands a string in the given conf section.
Runs a WRF-POM coupled simulation.
Represents a Product created by an external workflow.