1 """!Runs the POM initialization and POM-WRF coupled forecast.
3 This module handles the POM-coupled WRF simulation. It contains two
6 * POMInit -- an HWRFTask that is a wrapper around the Python pom package.
7 * WRFCoupledPOM - a subclass of hwrf.fcsttask.WRFAtmos that runs the
8 WRF-POM two-way coupled system based on the output of the POMInit."""
12 __all__ = [
'POMInit',
'WRFCoupledPOM']
14 import os, shutil, math
23 COMPLETED, RUNNING, FAILED
27 from hwrf.numerics import to_datetime, to_datetime_rel, to_fraction
35 prodnames={
'grid': (
'{oceandir}/{vit[stormname]}.grid.nc',
36 '{outdir}/{out_prefix}.pom.grid.nc'),
37 'ts_initial': (
'{oceandir}/{vit[stormname]}.ts_initial.nc',
38 '{outdir}/{out_prefix}.pom.ts_initial.nc' ),
39 'ts_clim': (
'{oceandir}/{vit[stormname]}.ts_clim.nc',
40 '{outdir}/{out_prefix}.pom.ts_clim.nc' ),
41 'uv_initial': (
'{oceandir}/{vit[stormname]}.uv_initial.nc',
42 '{outdir}/{out_prefix}.pom.uv_initial.nc' ),
43 'el_initial': (
'{oceandir}/{vit[stormname]}.el_initial.nc',
44 '{outdir}/{out_prefix}.pom.el_initial.nc' ),
45 'restart.phase2': (
'{oceandir}/restart.phase2.nc',
46 '{outdir}/{out_prefix}.pom.restart.phse2.nc'
48 'pom.nml': (
'{nmldir}/pom.nml',
49 '{outdir}/{out_prefix}.pom.nml' ) }
50 """A mapping from product name to a two-element tuple. The tuple
51 contains the path to the file in the local directory structure of the
52 pom package, and the destination file within the HWRF system. Both
53 should be sent through string interpolation (strinterp or
54 timestrinterp) before use."""
57 """!A wrapper around the pom package that runs the POM initialization.
59 This HWRFTask subclass is a wrapper around the pom package. It
60 runs the POM initialization, and generates the POM namelist for
62 def __init__(self,dstore,conf,section,taskname=None,vitfile=None,
63 fcstlen=
None,outstep=86400,**kwargs):
64 """!Creates a POMInit.
65 @param dstore the produtil.datastore.Datastore to use
66 @param conf the HWRFConfig to use
67 @param section the section name for this task
68 @param taskname the task name. Default: section
69 @param vitfile the vitals file with tcvitals for all times this
70 storm has existed. Default:
71 self.icstr('{WORKhwrf}/{stormlabel}.vitals')
72 @param fcstlen The forecast length in hours.
73 @param outstep The output timestep in seconds.
74 @param kwargs Other keyword arguments are passed to the superclass constructor."""
75 if 'outdir' not in kwargs: outdir=conf.getdir(
'com')
76 super(POMInit,self).
__init__(dstore,conf,section,taskname=taskname,
79 'sfc_dataset',
'hwrfdata')))
81 'loop_dataset',
'hwrfdata')))
83 'sfcanl_item',
'gfs_sfcanl')))
85 'sanl_item',
'gfs_sanl')))
87 'loop_item',
'gfdl_loop')))
89 'wc_ring_item',
'gfdl_wc_ring')))
90 self.
_atime=to_datetime(conf.cycle)
95 vitfile=self.
icstr(
'{WORKhwrf}/{stormlabel}.vitals')
97 if 'catalog' in kwargs
and isinstance(kwargs[
'catalog'],
101 incat = str(kwargs.get(
'catalog',self.
confstr(
102 'catalog',
'hwrfdata')))
108 outputdir=os.path.join(rundir,
'output')
112 """!Runs the POM initialization and copies the results to their
113 destinations within the HWRF work area."""
119 inputdir=os.path.join(rundir,
'input')
120 outputdir=os.path.join(rundir,
'output')
121 if os.path.exists(rundir):
122 shutil.rmtree(rundir)
123 with
NamedDir(rundir,keep=
True,logger=logger)
as d:
124 with
NamedDir(inputdir,keep=
True,logger=logger)
as d:
126 with
NamedDir(outputdir,keep=
True,logger=logger)
as d:
132 logger.info(
'Basin is unsupported.')
135 except Exception
as e:
136 logger.error(
'Unhandled exception in ocean init: %s'
137 %(str(e),),exc_info=
True)
141 def _make_products(self,outdir):
142 """!Creates FileProduct objects for all output files.
143 @param outdir The directory to which the pom package output its
146 oceandir=os.path.join(outdir,
'OCEAN')
147 with self.dstore.transaction():
148 for prodname,filepaths
in prodnames.iteritems():
149 (localpath,compath)=filepaths
152 prod.location=self.
timestr(compath,atime,atime,
154 slocalpath=self.
timestr(localpath,atime,atime,
155 oceandir=oceandir,nmldir=outdir)
156 prod[
'localpath']=slocalpath
157 self.
_products[prodname]=( prod,slocalpath )
160 """!Delivers files to their final destination
161 Copies results to their destinations within the HWRF work areas
162 @param oceandir the OCEAN directory created by the pom package in run()
163 @param nmldir the directory in which the forecast namelist was made
164 @param redeliver if True, products are re-copied even if
172 for prodname,stuff
in self._products.iteritems():
173 assert(isinstance(stuff,tuple))
174 ( prod,localpath ) = stuff
175 if prod.available
and not redeliver:
176 logger.info(
'%s: already available and redeliver=False, so '
177 'skipping this (available=%s location=%s)'%(
178 prod.did,repr(prod.available),repr(prod.location)))
180 if not os.path.exists(localpath):
182 localpath+
": expected output file does not exist.")
184 baddies.append(localpath+
' (missing)')
186 elif not isnonempty(localpath):
187 logger.warning(localpath+
": is empty. Will deliver anyway."
188 +
" Beware of impending failures.")
189 baddies.append(localpath+
' (empty)')
190 prod.deliver(frominfo=localpath,keep=
False,logger=logger)
192 msg=
'Some ocean outputs were empty or missing: '+\
196 logger.critical(
'Ocean init failed: '+msg)
200 """!Internal function that passes control to the pom package
202 This internal implemenentation function passes control to
203 the pom package. This is part of the implementation of
204 run(). Do not call directly except for debugging.
205 @param inputdir the ocean input data directory
206 @param outputdir the ocean data output directory"""
207 CENTERID=self.storminfo.center.upper()
208 EXEChwrf=self.
getdir(
'EXEChwrf')
209 PARMhwrf=self.
getdir(
'PARMhwrf')
210 FIXhwrf=os.path.join(self.
getdir(
'FIXhwrf'),
'hwrf-pom')
216 STARTDATE=self._atime.strftime(
'%Y%m%d%H')
217 STORMID=self.storminfo.stormid3.upper()
218 STORMNAME=self.storminfo.stormname.upper()
219 kwargs=dict(logger=self.
log(), conf=self.
conf)
220 method=self.
confstr(
'method',
'')
221 if method: kwargs[
'input_method']=method.upper()
222 assert(GFSDIR.find(
'pom/output')<0)
224 setrlimit(logger=logger,stack=6e9,ignore=
True)
225 getrlimit(logger=logger)
226 sync_frequently=self.
confbool(
'sync_frequently',
True)
228 FIXhwrf,VITDIR,GFSDIR,LCDIR,CSTREAM,COMIN,
230 sync_frequently=sync_frequently,
234 """!Iterates over products
236 Iterates over Product objects for all of the files that need
237 to be copied to the forecast directory to run POM. The
238 products will all have a "localname" metadata value telling
239 the local filename they should have in the forecast directory.
240 @param name If given, only the product with this name is yielded"""
242 for p
in self._products.itervalues():
yield p[0]
247 """!Iterates over all needed input data."""
255 STORMID=self.storminfo.stormid3.upper()
256 BASIN=STORMID[2].upper()
264 """!Obtains input data, links or copies to places expected by POM.
266 Copies all inputs to locations expected by the pom package.
267 Copies the GFS sanl and sfcanl, waiting for them if needed.
268 Makes a new tcvitals file by parsing the old one, and
269 generating a new one, discarding lines containing "INVEST"."""
274 with self.dstore.transaction()
as t:
276 atime=atime,logger=logger)
278 atime=atime,logger=logger)
286 sfcanl.location=sfcanlx
287 sfcanl.available=
False
289 names={ sanl:self.
timestr(
'gfs.t{aHH}z.sanl',0,atime=self.
_atime),
290 sfcanl:self.
timestr(
'gfs.t{aHH}z.sfcanl',0,
292 def namer(p,logger,*a):
return names[p]
293 def actor(p,name,logger,*a): make_symlink(p.location,name,
294 force=
True,logger=logger)
295 wait_for_products([sanl,sfcanl],logger,namer,actor)
298 maxback=max(1,self.
confint(
'loop_current_max_days_back',30))
300 for idelta
in xrange(maxback):
302 looptime=to_datetime_rel(hdelta,atime)
303 stime=looptime.strftime(
'%Y%m%d%H')
305 atime=looptime,logger=logger)
306 wc_ring=self._catalog.locate(
310 if not isnonempty(loop):
312 logger.warning(
'%s (loop at time %s): is empty or '
313 'non-existant'%(str(loop),stime))
314 if not isnonempty(wc_ring):
316 logger.warning(
'%s (loop wc_ring at time %s): is empty or '
317 'non-existant'%(str(wc_ring),stime))
320 looploc=self.
timestr(
'hwrf_gfdl_loop_current_rmy5.dat.{aYMD}',
322 make_symlink(loop,looploc,logger=logger)
324 'hwrf_gfdl_loop_current_wc_ring_rmy5.dat.{aYMD}',
326 make_symlink(wc_ring,wc_ringloc,logger=logger)
328 logger.critical(
'No loop current available. Checked %d day(s) '
329 'for loop current for %s'
330 %(maxback,atime.strftime(
'%Y%m%d')))
333 vitdest=
'syndat_tcvitals.%04d'%(self.storminfo.when.year,)
334 logger.info(
'Copy vitals %s to %s'%(self.
_vitfile,vitdest))
335 with open(vitdest,
'wt')
as outf:
336 with open(self.
_vitfile,
'rt')
as inf:
338 if line.find(
'INVEST')>=0:
344 """!This is an internal implementation class that should never be
345 used directly. It instructs the hwrf.coupling.CoupledWRF to call
346 the WRFCoupledPOM.copy_pom_inputs to check or link POM input
349 """Creates a POMIniter that will pass control to the given
350 WRFCoupledPOM object, stored as self.wcp."""
353 """Calls the WRFCoupledPOM.copy_pom_inputs with just_check=True."""
354 return self.wcp.copy_pom_inputs(just_check=
True)
356 """Calls the WRFCoupledPOM.copy_pom_inputs passing just_check."""
357 return self.wcp.copy_pom_inputs(bool(just_check))
361 """!Runs a WRF-POM coupled simulation.
363 Most of the work of this class is done by the superclass,
364 WRFAtmos. This class adds code to copy the inputs needed by POM
365 and the coupler. There are three critical new config section
368 * wm3c_ranks = number of coupler ranks. Default: 1
369 * pom_ranks = number of POM ranks. Default: 9
370 * wrf_ranks = nubmer of WRF ranks. No default. This one is
372 def __init__(self,dstore,conf,section,wrf,keeprun=True,
373 wrfdiag_stream=
'auxhist1',pominit=
None,**kwargs):
374 """!WRFCoupledPOM constructor.
375 @param dstore the produtil.datastore.Datastore to use
376 @param conf the hwrf.config.HWRFConfig that provides configuration ifnormation
377 @param section the config section in conf
378 @param wrf the hwrf.wrf.WRFSimulation object that is being run
379 @param keeprun if True, the simulation temporary files are retained
380 @param wrfdiag_stream the stream that generates wrfdiag files
381 @param pominit The POMInit object.
382 @param kwargs passed to hwrf.fcsttask.WRFAtmos.__init__"""
383 if not isinstance(pominit,POMInit):
385 'The pominit argument to WRFCoupledPOM.__init__ must be a '
386 'POMInit object. You passed a %s %s.'%
387 (type(pominit).__name__,repr(pominit)))
388 super(WRFCoupledPOM,self).
__init__(dstore,conf,section,wrf,keeprun,
389 wrfdiag_stream,**kwargs)
392 self.
couple(
'coupler',
'hwrf_wm3c',
'wm3c_ranks',1)
394 self.
couple(
'pom',
'hwrf_ocean_fcst',
'ocean_ranks',9,pominiter)
395 self.
couple(
'wrf',
'wrf',
'wrf_ranks')
398 0,86400,172800,259200,345600,432000])
400 def remove_ocean(self):
401 """!Removes the ocean component from coupling.
402 @post Any call to run() will not include ocean coupling."""
407 """!Returns the POMInit object for this coupled forecast."""
411 """!Internal function for adding wave coupling. This must be
412 implemented by a subclass.
417 """!Copies or checks for the inputs required by the POM model.
418 This is an internal function used by the PomIniter class. Do
421 @param just_check If just_check=True, the inputs are not
422 copied; instead, the routine just checks for them. Do not use
423 just_check: call check_inputs instead."""
425 logger.info(
'Copying POM inputs from POMInit task %s'
426 %(self._pominit.taskname,))
428 for prod
in self._pominit.products():
430 if not prod.available: prod.check(logger=logger)
431 localname=prod.meta(
'localpath',
'')
435 msg=
'POM product %s (available=%s location=%s) has no '\
436 'localname.'%(prod.did,repr(avail),repr(loc))
441 raise POMInputError(msg)
443 msg=
'POM product %s (available=%s location=%s localname=%s)'\
444 ' is not available.'\
445 %(prod.did,repr(avail),repr(loc),repr(localname))
451 raise POMInputError(msg)
453 msg=
'POM product %s (available=%s location=%s localname=%s)'\
455 %(prod.did,repr(avail),repr(loc),repr(localname))
461 raise POMInputError(msg)
463 deliver_file(loc,os.path.basename(localname),keep=
True,
467 msg=
'No outputs reported by POM initialization.'\
468 ' Did you forget to run the ocean init?'
474 raise POMInputError(msg)
475 logger.info(
'Copied %d POM inputs. Returning True.'%(n_copied,))
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
def run_init(self, inputdir, outputdir)
Internal function that passes control to the pom package.
def products
Iterates over products.
taskname
Read-only property: the name of this task.
A subclass of Product that represents file delivery.
The base class of tasks run by the HWRF system.
conf
This HWRFTask's hwrf.config.HWRFConfig object.
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
def pominit(self)
Returns the POMInit object for this coupled forecast.
def component(self, which)
def add_coupled_stream(self, stream, times)
Products.
def link_coupled_inputs(self, just_check, logger)
def confbool
Alias for self.conf.getbool for section self.section.
def _add_wave(self)
Internal function for adding wave coupling.
Base class of tasks run by HWRF.
A shell-like syntax for running serial, MPI and OpenMP programs.
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
A piece of data produced by a Task.
outdir
The directory in which this task should deliver its final output.
Runs the real_nmm or wrf executables.
Raised when an unsupported basin is requested.
Stores products and tasks in an sqlite3 database file.
This subclass of TempDir takes a directory name, instead of generating one automatically.
def makedirs
Make a directory tree, working around filesystem bugs.
Time manipulation and other numerical routines.
This module allows querying resource usage and limits, as well as setting resource limits...
def deliver_products
Delivers files to their final destination Copies results to their destinations within the HWRF work a...
workdir
The directory in which this task should be run.
def confint
Alias for self.conf.getint for section self.section.
Main script to for running ocean spin up: Phase1 and Phase2 (also known as Phase3 and Phase4)...
def timestr(self, string, ftime, atime=None, section=None, kwargs)
Expands a string in the given conf section, including time vars.
def get_inputs(self)
Obtains input data, links or copies to places expected by POM.
def log
Obtain a logging domain.
A wrapper around the pom package that runs the POM initialization.
def __init__(self, dstore, conf, section, wrf, keeprun=True, wrfdiag_stream='auxhist1', pominit=None, kwargs)
WRFCoupledPOM constructor.
Raised when the ocean init did not produce some expected outputs.
def copy_pom_inputs
Copies or checks for the inputs required by the POM model.
def run_init(STORMNAME, STORMID, STARTDATE, EXEChwrf, PARMhwrf, FIXhwrf, VITDIR, GFSDIR, LCDIR, CSTREAM, COMIN, init_method=None, logger=None, fcstlen=None, outstep=None, sync_frequently=False, kwargs)
Run the ocean initialization.
This module contains exception classes for reporting errors in the POM initialization.
def uncouple
Removes a component, or all components, from the coupling.
Exceptions raised by the hwrf package.
def run(self)
Runs the POM initialization and copies the results to their destinations within the HWRF work area...
def _make_products(self, outdir)
Creates FileProduct objects for all output files.
def confstr
Alias for self.conf.getstr for section self.section.
def __init__(self, dstore, conf, section, taskname=None, vitfile=None, fcstlen=None, outstep=86400, kwargs)
Creates a POMInit.
def inputiter(self)
Iterates over all needed input data.
def check_coupled_inputs(self, logger)
def icstr(self, string, section=None, kwargs)
Expands a string in the given conf section.
Runs a WRF-POM coupled simulation.
Represents a Product created by an external workflow.
This is an internal implementation class that should never be used directly.