1 """!PrepHybrid runs the prep_hybrid program to transform GFS spectral files to the HWRF grid."""
15 from hwrf.numerics import to_datetime, to_datetime_rel, to_timedelta, \
16 to_fraction, timedelta_epsilon, TimeMapping, partial_ordering, \
18 from produtil.fileop import deliver_file, make_symlink, fortlink, realcwd, \
24 """!Runs the prep_hybrid program on GFS spectral files.
26 Tracks a sequence of GFS spectral files for a list of times. Runs
27 prep_hybrid on them, one by one. This can be done by calling
28 run(), which runs prep_hybrid for all of them before returning, or
29 runpart() which runs only one before returning. If self.realtime
30 is True, then this class will wait for data to appear."""
31 def __init__(self,dstore,conf,section,wrf,geogdata,atime=None,
32 ftime=
None,taskname=
None,times=
None,inputs=
None,
33 in_item=
None,in_dataset=
None,one_time=
False,
34 in_anl_item=
None,**kwargs):
35 """!PrepHybrid constructor.
36 @param dstore the produtil.datastore.Datastore database
37 @param conf the hwrf.config.HWRFConfig for config info
38 @param section the section to use in conf
39 @param wrf the hwrf.wrf.WRFSimultion being run
40 @param geogdata the geogrid data product
41 @param atime,ftime analysis and forecast times
42 @param taskname taskname in the dstore
43 @param times prep_hybrid input/output times
44 @param inputs the hwrf.input.DataCatalog with the input data
45 @param in_item,in_dataset Input item and dataset
46 @param in_anl_item alternate input item for analysis time
47 @param one_time If True, only one time is prepped but we lie
48 and say should be used for all times. This is used to trick
49 the HWRF into running with the GDAS 9hr forecast even though
50 a 12hr boundary condition would be needed
51 @param kwargs More keyword arguments are passed to hwrf.hwrftask.HWRFTask.__init__"""
52 if taskname
is None: taskname=section
54 super(PrepHybrid,self).
__init__(dstore,conf,section,taskname,**kwargs)
57 self.
times=[ x
for x
in wrf.bdytimes() ]
59 self.
times=sorted(times)
62 hd=self.
confstr(
'catalog',
'hwrfdata')
65 self.
times,default=to_fraction(wrf.bdystep())/10)
66 if in_dataset
is None: in_dataset=self.
confstr(
'dataset')
68 if in_item
is None: in_item=self.
confstr(
'item')
69 if in_anl_item
is None:
70 in_anl_item=self.
confstr(
'anl_item',
'')
75 if atime
is None: atime=wrf.simstart()
76 if ftime
is None: ftime=atime
139 """!Iterates over the list of data needed to run this class."""
147 item=in_item,ftime=t,atime=atime)
150 """!Finds the input needed for the specified time.
152 @param when the time of interest
153 @returns the spectral file at that time."""
159 when=to_datetime_rel(when,ftime)
162 logger.info(
'Within dt epsilon: %s %s'%(when,atime))
165 logger.info(
'NOT within dt epsilon: %s %s'%(when,atime))
169 "Check for dataset=%s item=%s ftime=%s atime=%s in %s"
171 when.strftime(
"%Y%m%d%H"),
172 atime.strftime(
"%Y%m%d%H"), repr(self.
inputs) ))
173 return self.inputs.locate(self.
in_dataset,in_item,ftime=when,
176 """!Guesses the WRF I/O form that was used to run geogrid
177 @returns the io_form for auxinput2"""
178 return self.wrf.io_form_for(
'auxinput2')
180 """!Constructs the prep_hybrid namelist in the Conf2Namelist
183 siu=self.nl.nl_set_if_unset
184 wg=self.wrf.nl.nl_get
185 wtg=self.wrf.nl.trait_get
187 siu(
'prmfld',
'ntimes',1)
188 siu(
'domain',
'levels',wg(
'domains',
'eta_levels'))
189 siu(
'domain',
'ptsgm',float(wg(
'domains',
'ptsgm')))
190 siu(
'domain',
'p_top_requested',float(wtg(
'ptop'))/100.0)
192 """!Creates products objects for later file delivery.
194 Creates the produtil.datastore.FileProduct objects for
195 delivery of prep_hybrid output."""
201 location=os.path.join(outdir,
'hwrfinit_0'))
202 for itime
in xrange(len(self.
times)):
203 time=self.
times[itime]
204 assert(time
is not None)
206 location=os.path.join(outdir,
'hwrfbcs_%d'%(itime,)))
207 self.
logprod[time]=os.path.join(outdir,
'prep_%d.log'%(itime,))
209 """!Return the FileProduct for the prep_hybrid initial time output file."""
212 """!Iterates over the FileProduct for the prep_hybrid boundary output files."""
213 for prod
in self.bdyprod.itervalues():
216 """!Iterates over all output products.
218 Iterates over all products meeting the requirements.
219 @param time the time of interest, or None for all times
220 @param name "init" for initial state, "bdy" for boundary
221 information, or None for all."""
222 if name
is None or name==
'init':
223 dt=to_timedelta(abs(to_fraction(time-self.wrf.simstart())))
224 if time
is None or dt<self.
_epsilon:
226 if name
is None or name==
'bdy':
229 for (time,prod)
in bdys.iteritems():
yield prod
231 when=bdys.neartime(time,self.
_epsilon)
232 if when
in bdys:
yield bdys[when]
234 """!Marks all products as unavailable, but does not delete
239 """!Deletes all temporary files (the self.workdir)."""
240 self.rmtree(os.path.join(self.
workdir))
242 """!Preps all times, even if they have already been prepped.
243 @param keep if True, copy output files instead of moving them
244 to the destination."""
247 for now
in self.
times:
248 logger.info(
'time %s: prep %s'%(now.strftime(
'%Y%m%d-%H%M%S'),
250 for ipiece
in xrange(len(self.
times)):
254 """!Preps one time that has not yet been prepped, and returns.
255 @param keep if True, copy output files instead of moving them
256 to the destination."""
258 for ipiece
in xrange(len(self.
times)):
259 now=self.
times[ipiece]
260 if not self.
bdyprod[now].available
or ipiece==0
and \
261 not self.initprod.available:
263 'time %s (piece %d): not done; process this one'
264 %(now.strftime(
'%Y%m%d-%H%M%S'),ipiece))
268 except Exception
as e:
269 logger.warning(
'time %s (piece %d): %s'%(
270 now.strftime(
'%Y%m%d-%H%M%S'),ipiece,str(e)),
274 self.
log().info(
'time %s (piece %d): already complete'
275 %(now.strftime(
'%Y%m%d-%H%M%S'),ipiece))
277 """!This is the internal helper function that runs the
278 prep_hybrid for self.run and self.runpart.
279 @param ipiece the index of the output time to run
280 @param keep if True, copy output files instead of moving them
281 to the destination."""
283 cenla=self.conf.getfloat(
'config',
'domlat')
284 cenlo=self.conf.getfloat(
'config',
'domlon')
285 now=self.
times[ipiece]
287 assert(isinstance(prepme,basestring))
288 if self.
realtime and not wait_for_files(
290 maxwait=self.
confint(
'max_spectral_wait',1800),
291 sleeptime=self.
confint(
'spectral_sleep_time',20),
292 min_size=self.
confint(
'min_spectral_size',int(4e7)),
293 min_mtime_age=self.
confint(
'min_spectral_age',30),
297 msg=
'Some input spectral files do not exist. Giving up.'
301 stime=now.strftime(
'%Y%m%d-%H%M%S')
302 geoloc=self.geogdata.location
303 if not self.geogdata.available:
304 raise NoGeogData(
'%s product: WPS geogrid data is not yet '
305 'available. (loc=%s)'%(
306 str(self.geogdata.did), repr(self.geogdata.location)))
307 moad=self.wrf.get_moad()
310 ftime=now,workdir=self.
workdir)
312 logger.info(
'%s: prep in directory %s',stime,realcwd())
313 make_symlink(self.geogdata.location,
'geogrid.out',
314 logger=logger,force=
True)
315 with open(
'dloc',
'wt')
as f: f.write(
"%f\n%f\n"%(cenla,cenlo))
316 with open(
'itime',
'wt')
as f: f.write(
"%d\n"%(ipiece,))
317 bdyfile=
'../hwrfbcs00_%d'%(ipiece,)
318 fortlink({52:bdyfile,
321 45:
'dloc', },logger=logger,force=
True)
323 initfile=
'../hwrfinit_%d'%(ipiece,)
324 fortlink({53:initfile},logger=logger,force=
True)
325 with open(
'hwrf_na12_mkbnd.parm',
'wt')
as f:
326 f.write(self.nl.make_namelist(section_sorter=partial_ordering(
327 [
'rgrid',
'prmfld',
'domain'])))
332 ex=( bigexe(self.
getexe(
'hwrf_prep')).env(
334 OMP_STACKSIZE=
"128M",
335 MKL_NUM_THREADS=
'1' )[
336 moad.nl.nl_get(
'domains',
'e_we'),
337 moad.nl.nl_get(
'domains',
'e_sn'),
338 moad.nl.nl_get(
'domains',
'e_vert'),
339 moad.nl.nl_get(
'domains',
'dx'),
340 moad.nl.nl_get(
'domains',
'dy'),
342 <
'hwrf_na12_mkbnd.parm' ) >=
'prep.log'
343 threads=self.
confint(
'threads',0)
345 setrlimit(logger=logger,stack=6e9,ignore=
True)
347 getrlimit(logger=logger)
349 with rusage(logger=logger):
351 logger.info(
'Use automatic thread count.')
355 logger.info(
'Use %d threads'%(threads,))
356 ex2=openmp(ex,threads=threads)
357 MALLOC_CHECK_=self.
confint(
'MALLOC_CHECK_',-999)
358 if MALLOC_CHECK_ != -999:
359 ex2.env(MALLOC_CHECK_=str(MALLOC_CHECK_))
360 checkrun(ex2,logger=logger)
363 self.initprod.location))
364 self.initprod.deliver(
365 frominfo=initfile,logger=logger,keep=
False)
367 frominfo=bdyfile,logger=logger,keep=
False)
368 deliver_file(
'prep.log',self.
logprod[now],logger=logger,
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
Create namelists, monitor wrf simulations, generate filenames.
Generates a Fortran namelist entirely from config files.
def input_at(self, when)
Finds the input needed for the specified time.
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
def __init__(self, dstore, conf, section, wrf, geogdata, atime=None, ftime=None, taskname=None, times=None, inputs=None, in_item=None, in_dataset=None, one_time=False, in_anl_item=None, kwargs)
PrepHybrid constructor.
taskname
Read-only property: the name of this task.
A subclass of Product that represents file delivery.
The base class of tasks run by the HWRF system.
conf
This HWRFTask's hwrf.config.HWRFConfig object.
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
in_anl_item
alternate input item for analysis time
initprod
Prep initial state product.
Raised when the spectral input files to prep_hybrid do not exist after some specified amount of time...
one_time
If True, only one time is prepped but we lie and say should be used for all times.
def within_dt_epsilon(time1, time2, epsilon)
Returns True if time1 is within epsilon of time2, and False otherwise.
def io_form_geogrid(self)
Guesses the WRF I/O form that was used to run geogrid.
bdyprod
Mapping from time to boundary output product.
Implements the produtil.run: provides the object tree for representing shell commands.
A shell-like syntax for running serial, MPI and OpenMP programs.
Base class of tasks run by HWRF.
times
prep_hybrid input/output times
outdir
The directory in which this task should deliver its final output.
def run_ipiece
This is the internal helper function that runs the prep_hybrid for self.run and self.runpart.
def clean(self)
Deletes all temporary files (the self.workdir).
Stores products and tasks in an sqlite3 database file.
This subclass of TempDir takes a directory name, instead of generating one automatically.
def makedirs
Make a directory tree, working around filesystem bugs.
Time manipulation and other numerical routines.
inputs
the hwrf.input.DataCatalog with the input data
This module allows querying resource usage and limits, as well as setting resource limits...
workdir
The directory in which this task should be run.
def confint
Alias for self.conf.getint for section self.section.
def run
Preps all times, even if they have already been prepped.
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
def products
Iterates over all output products.
def log
Obtain a logging domain.
def prep_bdy_files(self)
Iterates over the FileProduct for the prep_hybrid boundary output files.
def conftimestrinterp(self, string, ftime, atime=None, section=None, kwargs)
Alias for self.timestr for backward comaptibility.
def init_namelist(self)
Constructs the prep_hybrid namelist in the Conf2Namelist object in self.nl.
in_item
hwrf.input.DataCatalog input item
logprod
Mapping from time to prep.log log file product.
Exceptions raised by the hwrf package.
def make_products(self)
Creates products objects for later file delivery.
def confstr
Alias for self.conf.getstr for section self.section.
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
geogdata
the geogrid data product
def unrun(self)
Marks all products as unavailable, but does not delete delivered files.
Runs the prep_hybrid program on GFS spectral files.
def inputiter(self)
Iterates over the list of data needed to run this class.
Raised when the prep_hybrid program cannot find WPS geogrid output data.
def prep_init_file(self)
Return the FileProduct for the prep_hybrid initial time output file.
def taskvars(self)
The dict of object-local values used for string substitution.
def runpart
Preps one time that has not yet been prepped, and returns.
def realtime(self)
Is this job a real-time forecast job?
wrf
the hwrf.wrf.WRFSimultion being run
in_dataset
hwrf.input.DataCatalog Input dataset
nl
hwrf.namelist.Conf2Namelist with the prep_hybrid namelist