1 """!Runs the real_nmm or wrf executables.
3 This module, contrary to its name, implements HWRFTask classes to
4 run the real_nmm and WRF, including tasks to make the the wrfanl and
5 ghost files. The uncoupled forecast is found in the WRFAtmos class.
6 The coupled forecast is not in this module: it is hwrf.mpipomtc
9 All classes you will want to use derive from WRFTaskBase and FcstTask,
10 classes that exist solely to simplify later code. These are the
11 classes you will want to use:
13 * RealNMM - to run the real_nmm program
14 * WRFAnl, WRFAnl4Trak - 1 minute simulation to generate wrfanl files
15 * WRFGhost, WRFGhostForPost - 1 minute simulation to generate wrfghost files
16 * WRFAtmos - full-length uncoupled forecast
17 * AnalysisCycle - short (1-6 hour) forecast that outputs wrfinput and wrfghost files, suitable for input to another AnalysisCycle
18 * hwrf.mpipomtc.WRFCoupledPOM - coupled WRF+POM forecast, in the hwrf.mpipomtc module
20 All of these classes take input from one another, or from other
21 sources. The abstract base class FcstTask keeps track of those
22 through its FcstTask.add_input() function. The WRFTaskBase class
23 provides simple functions to call add_input for you. Those inputs are
24 stored in objects of class Input2Fcst and its subclasses:
26 * Geog2WRF (WRFTaskBase.add_geogrid()) - geogrid data from hwrf.wps.Geogrid
27 * Met2WRF (WRFTaskBase.add_metgrid()) - metgrid data from hwrf.wps.Metgrid
28 * WRFInput2WRF (WRFTaskBase.add_wrfinput()) - wrfinput data from RealNMM, AnalysisCycle, hwrf.relocate.Merge or hwrf.ensda.FromPriorCycle
29 * Fort652WRF (WRFTaskBase.add_fort65()) - coupler fort.65 file from RealNMM
30 * WRFAnl2WRF (WRFTaskBase.add_wrfanl()) - wrf analysis (time 0 restart) files from WRFAnl, WRFAnl4Trak, WRFGhost, WRFGhostForPost, AnalysisCycle, hwrf.relocate.Merge or hwrf.relocate.RelocationTask (and its subclasses)
31 * WRFBdy2WRF (WRFTaskBase.add_wrfbdy()) - wrf boundary (wrfbdy) files from RealNMM
32 * Prep2WRF (WRFTaskBase.add_prep_hybrid()) - prep_hybrid files from hwrf.prep.PrepHybrid
39 import os, collections, glob, re,math, fractions, shutil, sys, time,logging
46 check_last_lines, isnonempty, remove_file
48 from produtil.run import checkrun, mpi, mpirun, bigexe, ExitStatusException,\
49 run, batchexe, runsync
51 WRFPrepMissing, WRFInputMissing, WRFBdyMissing, WRFAnlMissing, \
52 ForecastInputError, SetNestFailed, RealNMMError
53 from hwrf.numerics import to_fraction, within_dt_epsilon, to_datetime_rel, \
60 """!abstract base class of wrf/real_nmm input providers
62 This is the abstract base class of anything that gets, or creates,
63 input files for a WRF simulation without running another Task.
64 For example, something that copies the geogrid output would be a
65 subclass of Input2Fcst"""
67 """!creates a new Input2Fcst with the specified src.
69 The src is implementation-dependent and can be interpreted as
70 desired by subclasses. If no "src" is needed, None is
74 """!copies or links input files.
76 This function is unimplemented: subclasses are expected to
77 replace it. If just_check=True, checks to see if inputs are
78 available, returning True if they are and False otherwise. If
79 just_check=False, then the files are actually copied or linked.
81 This default implementation does nothing and returns True.
83 @return True on success, False otherwise"""
85 def link_product(self,product,excclass,logger,target=None,
87 """!helper function that links data
89 If just_check=True, checks to see if data is available,
90 returning True if it is, and False otherwise. If just_check
91 is False, then the file is linked from the given product to
92 the target location (basename(product.location) if no target
93 is provided). If the product is not yet available or has no
94 location, then the given exception class excclass is raised.
96 @return True on success, False otherwise
97 @param product the produtil.datastore.Product to link
98 @param excclass the class of Exception to raise on error
99 @param logger the logging.Logger to use for logging
100 @param just_check if True, just check for data, but link nothing
101 @param target the name of the link
104 logger=logging.getLogger(
'hwrf.fcsttask')
106 (L,A)=(product.location,product.available)
109 (L,A)=(product.location,product.available)
112 target=product.meta(
'basename',
'')
114 target=os.path.basename(L)
118 elif logger
is not None:
119 msg=
'%s: file is empty or non-existent'%(L,)
122 make_symlink(L, target, logger=logger,force=
True)
124 msg=
'%s: unavailable (available=%s location=%s)'%(
125 str(product.did),repr(A),repr(L))
126 if logger
is not None: logger.warning(msg)
127 if just_check:
return False
135 """!Links Geogrid data to the current directory for a wrf or
138 This class links geogrid data to the current directory for a wrf
139 or real_nmm run, taken from some source (src) object sent to
140 __init__. That src must have a geodat(domain) function that can
141 provide a Product with geogrid output data (such as
142 hwrf.wps.Geogrid.geodat())"""
143 def get_inputs(self,logger,domain,just_check=False,**kwargs):
144 """!Links geogrid data if any is available for the specified domain.
146 If just_check=True, checks for geogrid input data. Otherwise,
147 links the data using Input2Fcst.link_product. Raises
148 WRFGeogridMissing if the data is missing."""
149 if self.
src is not None and domain
is not None:
150 p=self.src.geodat(domain)
152 if not p.available: p.check()
154 just_check=just_check)
159 """!Links Metgrid data to the current directory for a wrf or
162 This class links metgrid.exe output data to the current directory
163 for a wrf or real_nmm execution. The output data is taken from
164 some source object, sent to the src argument of __init__. That
165 object must have a met_at_time(ftime) function that can provide a
166 Product for metgrid data at a specific forecast time (such as
167 hwrf.wps.Metgrid.met_at_time())"""
169 """!Links metgrid data if any is available for the specified time.
171 If just_check=True, checks for metgrid input data. Otherwise,
172 links the data using Input2Fcst.link_product. Raises
173 WRFMetgridMissing if the data is missing."""
174 if self.
src is not None and ftime
is not None:
175 p=self.src.met_at_time(ftime)
177 if not p.available: p.check()
179 just_check=just_check)
184 """!Links real_nmm wrfinput_d01 files the current directory for a
187 This class links real_nmm wrfinput_d01 files the current directory
188 for a wrf run. It gets those files from a source object specified
189 in the src argument to __init__. The src must have a
190 wrfinput_at_time(ftime) function that can provide a Product for
191 metgrid data at a specific forecast time (such as
192 hwrf.fcsttask.RealNMM.wrfinput_at_time())."""
193 def get_inputs(self,logger,atime,domain,just_check=False,**kwargs):
194 """!Links wrfinput_d01 files.
196 If just_check=True, checks for wrfinput_d01 data. Otherwise,
197 links the data using Input2Fcst.link_product. Raises
198 WRFInputMissing if the data is missing."""
199 if self.
src is not None and atime
is not None and domain
is not None:
200 p=self.src.wrfinput_at_time(atime,domain)
202 if not p.available: p.check()
204 target=
'wrfinput_d01',
205 just_check=just_check)
210 """!Links real_nmm fort.65 files the current directory for a wrf run.
212 This class links real_nmm fort.65 files the current directory for
213 a wrf run. The files are taken from some source object, specified
214 in the src argument to __init__. The src must have a
215 fort65_at_time(atime,domain) function that provides a Product for
216 the fort.65 file. See hwrf.fcsttask.RealNMM.fort65_at_time() for
218 def get_inputs(self,logger,atime,domain,just_check=False,**kwargs):
219 """!Links coupler input data for the specified domain and time.
221 If just_check=True, checks for a coupler "fort.65" input
222 product. Otherwise, links the data using
223 Input2Fcst.link_product to the filename "fort.65" in the
224 current working directory. Raises WRFInputMissing if the data
226 @param atime the analysis time
227 @param domain the wrf domain (a number, string name or WRFDomain)
228 @param logger a logging.Logger for logging messages
229 @param just_check if True, just check for data, otherwise link it
230 @param kwargs ignored"""
231 if self.
src is not None and atime
is not None and domain
is not None:
232 p=self.src.fort65_at_time(atime,domain)
234 if not p.available: p.check()
237 just_check=just_check)
242 """!Links wrfanl or ghost files the current directory for a wrf run.
244 This class links wrfanl or ghost files the current directory for a
245 wrf run. The files come from some source object, sent to the src
246 argument of __init__. That object must have a
247 wrfanl_at_time(atime,domain) function like
248 hwrf.fcsttask.WRFAnl.wrfanl_at_time()"""
250 """!creates a WRFAnl2WRF for the specified source and domain"""
251 super(WRFAnl2WRF,self).
__init__(src)
253 def get_inputs(self,logger,atime,domain,wrfanl='wrfanl',
254 just_check=
False,**kwargs):
255 """!Links wrfanl files for the specified domain and time.
257 If just_check=True, checks to see if there should be wrfanl
258 files for the specified domain and time. Otherwise, links the
259 data using Input2Fcst.link_product to the filename "fort.65"
260 in the current working directory. Raises WRFAnlMissing if the
262 @param atime the analysis time
263 @param domain the wrf domain (a number, string name or WRFDomain)
264 @param logger a logging.Logger for logging messages
265 @param wrfanl the beginning of the link filename. Usually this is
266 "wrfanl", "wrfghost" or "ghost".
267 @param just_check if True, just check for data, otherwise link it
268 @param kwargs ignored"""
269 if self.
src is not None and atime
is not None and domain
is not None\
271 p=self.src.wrfanl_at_time(atime,domain)
273 if not p.available: p.check()
275 str(wrfanl)+
'_d<domain>_<date>',domain.get_grid_id(),
276 atime,domain.nocolons)
278 p,WRFAnlMissing,logger,target=localname,
279 just_check=just_check)
288 """!Links real_nmm wrfbdy_d01 files the current directory for a wrf
291 This class links real_nmm wrfbdy_d01 files the current directory
292 for a wrf run. Those files are taken from some source object
293 specified in the src argument to __init__. The source must have a
294 wrfbdy_at_time(atime,domain) function like
295 hwrf.fcsttask.RealNMM.wrfbdy_at_time() """
296 def get_inputs(self,logger,atime,domain,just_check=False,**kwargs):
297 """!Links a wrfbdy file for the specified domain and time.
299 If just_check=True, checks for a wrfbdy input product.
300 Otherwise, links the data using Input2Fcst.link_product to the
301 filename "fort.65" in the current working directory. Raises
302 WRFBdyMissing if the data is missing.
303 @param atime the analysis time
304 @param domain the wrf domain (a number, string name or WRFDomain)
305 @param logger a logging.Logger for logging messages
306 @param just_check if True, just check for data, otherwise link it
307 @param kwargs ignorex"""
308 if self.
src is not None and atime
is not None and domain
is not None:
309 p=self.src.wrfbdy_at_time(atime,domain)
311 if not p.available: p.check()
314 just_check=just_check)
318 """!Links prep_hybrid files to the current directory for a real_nmm
321 This class links prep_hybrid files to the current directory for a
322 real_nmm run. The files must come from some source object,
323 specified in the src argument to __init__. That object must have
324 a products() function that behaves like
325 hwrf.prep.PrepHybrid.products() when called with a time= and name=
328 """!Links prep_hybrid output files for the specified time index.
330 If just_check=True, checks for the input product's
331 availability. Otherwise, links the data using
332 Input2Fcst.link_product to the filename "fort.65" in the
333 current working directory. Raises WRFPrepMissing if the data
335 @param times a list of integers, 0 for the initial time, 1 for the first boundary input time, and so on.
336 @param logger a logging.Logger for logging messages
337 @param just_check if True, just check for data, otherwise link it
338 @param kwargs ignored"""
339 if self.
src is not None and times:
340 for i
in xrange(len(times)):
341 logger.info(
'Look for prep at time '+str(i))
343 what =
'init' if(i==0)
else 'bdy'
344 prod=[p
for p
in self.src.products(time=t,name=what)]
346 if just_check:
return False
347 raise WRFPrepMissing(
'No prep %s data for t=%s'%(
348 what,t.strftime(
'%Y%m%d-%H%M%S')))
350 if not prod.available: prod.check()
353 just_check=just_check):
354 if just_check:
return False
357 'When looking for prep data, src is none or times is false:'
358 'src=%s times=%s'%(repr(src),repr(times)))
359 if just_check:
return False
364 """!abstract base class of anything that runs or prepares input
367 Abstract base class of anything that runs a forecast model, or
368 prepares input to a forecast model. This should not be
369 instantiated directly. It exists just to simplify code in
371 def __init__(self,dstore,conf,section,outdir=None,taskname=None,
373 """!Creates a FcstTask object.
375 Creates a new FcstTask object. All parameters are passed to the
376 superclass constructor, hwrf.hwrftask.HWRFTask.__init__()"""
377 assert(taskname
is not None)
378 if taskname
is None: taskname=section
379 super(FcstTask,self).
__init__(dstore,conf,section,taskname=taskname,
380 outdir=outdir,**kwargs)
381 self.
inputs=collections.defaultdict(list)
383 """!is there any input data of this type to this task?
385 Returns True if there is at least one input source for the
386 specified input type. That is, if someone called
387 add_input(typename,(something)). Returns False otherwise.
389 @param typename the typename value that was sent to add_input
390 @returns True if add_input was called with that typename, or
392 return self.inputs.has_key(typename)
394 """!add input of a specified type
396 Adds an input of the given type typename that should be
397 provided by the given object. The object should be a subclass
399 @param typename the type of input
400 @param inobj the Input2Fcst object that will provide the input"""
401 self.
inputs[typename].append(inobj)
404 """!check if input data is available
406 Checks all inputs of the given typenames to make sure
407 link_input would work if called with the same parameters.
408 Returns True if link_input should succeed, and False if it
409 would fail. This is a simple wrapper around link_input with
410 just_check=True. However, subclasses may override this to
411 perform additional checks, such as for a coupled model.
413 @param typenames a list of typenames that were sent to add_input
414 @return True if all inputs are available, and False otherwise
415 @param kwargs passed to link_input()"""
416 return self.
link_input(typenames,just_check=
True,**kwargs)
418 """!link or check for inputs
420 Links all inputs of types given in typenames (an iterable) by
421 calling obj.get_inputs on anything sent to self.add_input. If
422 multiple input sources are available for a given input type,
423 then only the first one that has input is used.
425 Do not use the just_check option: it is part of the internal
426 implementation of check_input; if you need to just check the
427 inputs, use check_input instead. If just_check=True, then
428 nothing is linked. Instead, the routine just checks to see if
429 the inputs are available. That is the same as calling
430 check_input. However, subclasses may override check_input to
431 check additional inputs as part of a coupled model.
433 @param typenames a list of typenames to check, as sent to
435 @param kwargs passed to Input2Fcst.get_inputs()
436 @param just_check if True, just check for data, do not link.
437 Do not use this argument - it is part of the internal
438 implementation of this class. If you want to check for
439 inputs, call check_input() instead, as subclasses may
440 override that function to provide additional checks."""
441 if isinstance(typenames,basestring): typenames=( typenames, )
443 for typename
in typenames:
444 logger.info(
'Look for input of type %s with kwargs=%s'
445 %(typename,repr(kwargs)))
446 if typename
in self.
inputs:
447 thelist=self.
inputs[typename]
450 for inputter
in thelist:
451 logger.info(
'Check %s for input of type %s'
452 %(inputter,typename))
455 found=(inputter.get_inputs(
456 logger,just_check=just_check,**kwargs)
460 'Found input type %s in inputter #%d (%s)'
461 %(repr(typename),itry,repr(inputter)))
465 'Could not get input type %s in inputter'
467 %(repr(typename),itry,repr(inputter)))
468 except (ForecastInputError,KeyError)
as e:
470 'cannot get %s files due to exception: %s'
471 %(typename,str(e)),exc_info=
True)
474 msg=
'%s: could not find input files of this type. '\
475 'Giving up.'%(typename,)
481 raise ForecastInputError(msg)
489 """!base class of classes that run wrf or real_nmm
491 This is the abstract base class of tasks that run real or WRF.
492 The purpose of this class is simply to reduce code duplication."""
493 def __init__(self,dstore,conf,section,wrf,keeprun=True,**kwargs):
496 Creates a WRFTaskBase for the specified datastore, conf and
497 section. The wrf argument is a WRFSimulation. It will not be
498 used directly: instead the wrf is copied, and the copy is
499 used. If keeprun=True, then the output of the simulation,
500 after running it, will not be scrubbed. Other keyword
501 arguments are passed to the superclass FcstTask.
503 @param dstore the produtil.datastore.Datastore to use
504 @param conf the hwrf.config.HWRFConfig that provides configuration ifnormation
505 @param section the config section in conf
506 @param wrf the hwrf.wrf.WRFSimulation object that is being run
507 @param keeprun if True, the simulation temporary files are not deleted
508 @param kwargs passed unmodified to FcstTask.__init__()"""
509 super(WRFTaskBase,self).
__init__(dstore,conf,section,**kwargs)
514 def _set_wrf_proc_config(self,wrf,logger=None):
515 """!sets nproc_x, nproc_y and I/O server settings
517 This is a protected member function meant to be used by the
518 make_wrf implementation in subclasses. This class does not
519 use it. It sets the WRF nest_pes_x and nest_pes_y, and sets I/O
520 server settings if desired based on config options:
522 @param nest_pes_x,nest_pes_y compute grid dimensions per domain
523 @param nproc_x,nproc_y compute grid dimensions
524 @param nio_groups number of I/O server groups
525 @param nio_tasks_per_group number of servers per group
526 @param poll_servers set to True to poll I/O servers. This
527 generally decreases the number of I/O server groups needed
530 if logger
is None: logger=self.
log()
531 assert(wrf
is not None)
532 nio_groups=max(0,self.
confint(
'nio_groups',1))
538 nio_tasks_per_group=self.
confstr(
'nio_tasks_per_group',
'0').strip().strip(
',').strip()
539 if ',' in nio_tasks_per_group:
540 nio_tpg_split=nio_tasks_per_group.split(
',')
542 nio_tpg_split=nio_tasks_per_group.split()
543 nio_tasks_per_group=
','.join(nio_tpg_split)
546 if len(nio_tpg_split) > 1:
547 for num
in nio_tpg_split:
548 total_nio_tpg+=int(num)
550 nio_tasks_per_group=max(0,self.
confint(
'nio_tasks_per_group',0))
551 total_nio_tpg=nio_tasks_per_group
553 poll_servers=self.
confbool(
'poll_servers',
True)
555 if nio_groups*total_nio_tpg > 0:
556 if len(nio_tpg_split) > 1:
558 'Using %d nio_groups. With a possible nio_tasks_per_group '
559 'domain scheme up to %s domains with poll_servers=%s'%(
560 nio_groups,repr(nio_tasks_per_group),repr(poll_servers)))
563 'Using %d groups of %d io tasks with poll_servers=%s'%(
564 nio_groups,nio_tasks_per_group,repr(poll_servers)))
566 wrf.set_io_servers(nio_tasks_per_group,nio_groups,poll_servers)
568 logger.debug(
'Not setting io_server settings.')
579 nproc_x=self.
confint(
'nproc_x',-999)
580 nproc_y=self.
confint(
'nproc_y',-999)
582 multi_sids=self.conf.getstr(
'config',
'multistorm_sids',
'-999').strip().strip(
',').strip()
583 comm_start=self.
confstr(
'dm_task_split.comm_start',
'-999').strip().strip(
',').strip()
584 nest_pes_x=self.
confstr(
'dm_task_split.nest_pes_x',
'-999').strip().strip(
',').strip()
585 nest_pes_y=self.
confstr(
'dm_task_split.nest_pes_y',
'-999').strip().strip(
',').strip()
588 if nest_pes_x==
'-999' and nest_pes_y==
'-999' and comm_start==
'-999':
589 nest_pes_x_d01=int(nest_pes_x)
590 nest_pes_y_d01=int(nest_pes_y)
591 comm_start_d01=int(comm_start)
593 if ',' in comm_start:
594 comm_start_split=comm_start.split(
',')
596 comm_start_split=comm_start.split()
597 comm_start=
','.join(comm_start_split)
598 comm_start_d01=int(comm_start_split[0])
600 if ',' in nest_pes_x:
601 nest_pes_x_split=nest_pes_x.split(
',')
603 nest_pes_x_split=nest_pes_x.split()
604 nest_pes_x=
','.join(nest_pes_x_split)
605 nest_pes_x_d01=int(nest_pes_x_split[0])
607 if ',' in nest_pes_y:
608 nest_pes_y_split=nest_pes_y.split(
',')
610 nest_pes_y_split=nest_pes_y.split()
611 nest_pes_y=
','.join(nest_pes_y_split)
612 nest_pes_y_d01=int(nest_pes_y_split[0])
614 if multi_sids==
'-999':
617 if ',' in multi_sids:
618 storms=len(multi_sids.split(
','))
620 storms=len(multi_sids.split())
623 if (nest_pes_x_d01>0
and nest_pes_y_d01>0
and comm_start>=0) \
624 or (nest_pes_x_d01==-1
and nest_pes_y_d01==-1
and comm_start_d01==-1):
625 if storms >0
and nest_pes_x_d01>0
and nest_pes_y_d01>0:
626 inest_pes_x=self.conf.getstr(
627 'wrf_%sstorm'%storms,
'nest_pes_x',
'-999').strip().strip(
',').strip()
628 inest_pes_y=self.conf.getstr(
629 'wrf_%sstorm'%storms,
'nest_pes_y',
'-999').strip().strip(
',').strip()
630 if inest_pes_x==
'-999' and inest_pes_y==
'-999':
631 logger.error(
"Trying to set nest_pes x and y for %s storms, "
632 "but no '[wrf_%sstorm]' section in conf file. "
633 "Will set dm_task_split values to -1, which will "
634 "cause wrf to decide the task mesh ."%
635 (repr(storms),repr(storms)))
640 nest_pes_x=inest_pes_x
641 nest_pes_y=inest_pes_y
642 if ',' in nest_pes_x:
643 nest_pes_x_split=nest_pes_x.split(
',')
645 nest_pes_x_split=nest_pes_x.split()
646 nest_pes_x=
','.join(nest_pes_x_split)
647 nest_pes_x_d01=int(nest_pes_x_split[0])
649 if ',' in nest_pes_y:
650 nest_pes_y_split=nest_pes_y.split(
',')
652 nest_pes_y_split=nest_pes_y.split()
653 nest_pes_y=
','.join(nest_pes_y_split)
654 nest_pes_y_d01=int(nest_pes_y_split[0])
656 logger.debug(
'Setting dm_task_split comm_start=%s, nest_pes_x=%s, nest_pes_y=%s'%
657 (repr(comm_start),repr(nest_pes_x),repr(nest_pes_y)))
658 wrf.set_dm_task_split(comm_start, nest_pes_x, nest_pes_y,
659 comm_start_d01=comm_start_d01, nest_pes_x_d01=nest_pes_x_d01,
660 nest_pes_y_d01=nest_pes_y_d01)
662 if (nproc_x>0
and nproc_y>0)
or (nproc_x==-1
and nproc_y==-1):
663 logger.debug(
'No dm_task_split so Setting nproc_x=%d nproc_y=%d'%(nproc_x,nproc_y))
664 wrf.set_nprocs(nproc_x,nproc_y)
666 logger.debug(
'No dm_task_split and Not setting nproc_x or nproc_y (%s,%s)'%(
667 repr(nproc_x),repr(nproc_y)))
671 """!creates a WRFSimulation object for this class
673 Given the wrf object passed to the constructor, creates a new
674 WRFSimulation that is modified for this object. For example,
675 this class may need to add output of a restart and wrfinput
676 file at the end of the simulation, or enable hourly output of
678 @param wrf the wrf argument to __init__"""
681 """!adds a geogrid input source
683 Adds an input source (via self.add_input) that will provide
684 the output of WPS geogrid.exe. The given object must have a
685 geodat member function which takes a WRFDomain as its argument
686 and returns a Product to link. Returns self.
688 @param g the src argument to Geog2WRF.__init__()
692 """!adds a metgrid input source
694 Adds an input source (via self.add_input) that will provide
695 the output of WPS metgrid.exe. The given object must have a
696 met_at_time function that returns a Product to link for a
697 specified forecast time. Returns self.
699 @param m the src argument to Met2WRF.__init__()
703 """!adds a prep_hybrid input source
705 Adds an input source (via self.add_input) that will provide
706 the output of the prep_hybrid program. The given object must
707 have a products function that iterates over products for a
708 given name='bdy' or name='init' and a time=F for a given
709 forecast time F. Returns self.
711 @param p the src argument to Prep2WRF.__init__()
716 """!returns True if prep_hybrid was requested, and False
720 """!adds a wrfinput_d01 input source
722 Adds an input source (via self.add_input) that will provide
723 the wrfinput output file from real_nmm. The given object must
724 have a wrfinput_at_time(atime,domain) function that returns a
725 Product for a given analysis time and WRFDomain object.
728 @param r the src argument to WRFInput2WRF.__init__()
732 """!adds a wrfbdy_d01 input source
734 Adds an input source (via self.add_input) that will provide
735 the wrfbdy output of a real_nmm run. The given object must
736 have a wrfbdy_at_time(atime,domain) function that returns a
737 Product to link for a specified analysis time and WRFDomain
738 object. Returns self.
740 @param r the src argument to WRFBdy2WRF.__init__()
744 """!adds a coupler fort.65 input source
746 Adds an input source (via self.add_input) that will provide
747 the fort.65 output from real_nmm. given object must have a
748 fort65_at_time(atime,domain) function that returns a Product
749 to link for a specified analysis time and WRFDomain object.
752 @param r the src argument to Fort652WRF.__init__ """
755 """!add a fort.65, wrfinput_d01 and wrfbdy_d01 input source
757 This is a convenience function that simply passes its argument
758 to self.add_fort65, add_wrfinput and add_wrfbdy in that order.
761 @param r the src argument to Fort652WRF.__init__(),
762 WRFBdy2WRF.__init__() and WRFInput2WRF.__init__()
768 """!add a wrfanl input source for a specific domain
770 Adds an input source (via self.add_input) that will provide
771 the wrfanl output file from a prior run of wrf.exe. The given
772 object must have a wrfanl_at_time function that returns a
773 Product to link for a specified analysis time and domain.
776 @param r the src argument to WRFAnl2WRF.__init__()
777 @param domain the domain for whom this is the wrfanl product
779 name=
'wrfanl-%s'%(domain.name,)
782 """!called from constructor, creates Products
784 This is called from the WRFTaskBase constructor. Subclasses
785 should re-implement this method to generate internal
786 information about what products this class can provide, so
787 that self.products() and self.deliver_products() will function
788 properly. The default implementation does nothing."""
790 """!links or copies fixed data files
792 Links or copies fix files needed by WRF. Will copy if the
793 link_wrf_fix=no in this task's config section. Otherwise, the
795 link_files=self.confbool(
'link_wrf_fix',
True)
796 act=
'link' if link_files
else 'copy'
797 for confitem
in (
'fix.eta_lookup',
'fix.track',
'fix.wrf_other'):
798 pattern=self.confstr(confitem,
'')
802 '%s: no conf entry for this fix file or set of fix files'
805 logger.info(
'Will %s WRF fix files from %s to current directory.'
807 for src
in glob.glob(pattern):
808 tgt=re.sub(
'^hwrf_',
'',os.path.basename(src))
809 if link_files
or os.path.isdir(src):
810 make_symlink(src,tgt,logger=logger,force=
True)
812 deliver_file(src,tgt,logger=logger)
814 """!Checks to see if all needed input is available."""
817 """!Links all inputs provided by the various add_* member
820 @param just_check if True, just check to see if data is
821 available. If False, actually copy."""
822 use_prep = (
'prep' in self.
inputs)
826 'prep',times=[t
for t
in self.__wrf.bdytimes()],
827 just_check=just_check)
828 for domain
in self.
__wrf:
830 'geogrid',domain=domain,just_check=just_check)
831 for ftime
in self.__wrf.bdytimes():
833 'metgrid',ftime=ftime,just_check=just_check)
835 if 'wrfinput' in self.
inputs:
837 'wrfinput',domain=self.__wrf.get_moad(),
838 atime=self.__wrf.simstart(),just_check=just_check)
839 if 'wrfbdy' in self.
inputs:
841 'wrfbdy',domain=self.__wrf.get_moad(),
842 atime=self.__wrf.simstart(),just_check=just_check)
843 if 'fort65' in self.
inputs:
845 'fort65',domain=self.__wrf.get_moad(),
846 atime=self.__wrf.simstart(),just_check=just_check)
847 MOAD=self.__wrf.get_moad()
848 for domain
in self.
__wrf:
850 name=
'wrfanl-%s'%(domain.name,)
853 name,domain=domain,atime=self.__wrf.simstart(),
854 just_check=just_check)
857 """!Returns True if all metgrid files are needed as input to
860 def run_exe(self,exename='wrf',not_allranks=False,runner=None,
862 """!runs the executable
864 Runs the executable this task is responsible for running.
865 Determines if the program ran correctly. The exename is the
866 name of the argument in the [exe] section of the
869 @param not_allranks By default (False) all ranks are used to
870 run the executable. Pass not_allranks=True to run on only
873 @param runner pass a produtil.prog.Runner object if desired.
874 This overrides any decision of what to run: the exename and
875 not_allranks will be ignored, and whatever is supplied in
876 runner is simply passed to produtil.run.run.
878 @param sleeptime passed to produtil.run.run to determine how
879 often to check the child process. By default, the sleeptime
880 option in this task's config section is used, or if that is
883 @param exename if no runner is specified, self.getexe(exename)
884 is called to get the executable path, and it is run as an
886 if sleeptime
is None:
892 runner=mpirun(mpi(exe))
894 runner=mpirun(mpi(exe),allranks=
True)
895 sync_frequently=self.
confbool(
'sync_frequently',
True)
896 if self.
confbool(
'sync_frequently',
True):
897 run(batchexe(
'sync'))
899 stat=
run(runner,logger=logger,sleeptime=sleeptime)
900 logger.info(
'%s: exit status %d'%(exename,stat))
901 if not check_last_lines(
'rsl.out.0000',
'SUCCESS COMPLETE',
903 msg=
'%s: did not see SUCCESS COMPLETE in rsl.out.0000'%(exename,)
905 raise RealNMMError(msg)
907 logger.info(
'%s: SUCCESS COMPLETE in rsl.out.0000'%(exename,))
909 """! last step before running executable
911 Called by self.run() just before calling run_exe. The
912 default implementation does nothing. This is intended to be
913 overridden by subclasses."""
915 """!called immediately after cding to a scrub directory
917 Called by self.run() after CDing to the new directory, but
918 before doing anything else. The default implementation does
919 nothing. This is intended to be overridden by subclasses."""
921 """!run the wrf or real_nmm
923 Performs all work needed to run the program. Creates the work
924 directory, CD's to it, runs the initial_prerun(), link_fix(),
925 link_all_inputs(), make_namelist(), final_prerun(), run_exe(),
926 postrun() and deliver_products()."""
929 if os.path.exists(runhere):
930 logger.warning(
'%s: directory exists; will delete'%(runhere,))
931 assert(
not os.path.samefile(self.getdir(
'WORKhwrf'),runhere))
932 shutil.rmtree(runhere)
933 atime=self.__wrf.simstart()
934 with
NamedDir(runhere,keep=self.keeprun,logger=logger,
935 keep_on_error=
True)
as rundir:
937 logger.info(
'%s running in directory %s'%(
938 self.taskname,realcwd()))
939 self.location=runhere
940 self.initial_prerun()
942 self.link_all_inputs()
947 self.deliver_products()
948 except Exception
as e:
949 logger.critical(
'%s failed: %s'%(self.taskname,str(e)),
952 self.postmsg(
'%s: completed'%(self.taskname,))
954 """!called just after run_exe() succeeds.
956 Called by self.run() just after run_exe returns successfully.
957 The default implementation does nothing; this is intended to
958 be overridden in subclasses."""
960 """!delivers products to their destinations
962 Called by self.run() after postrun(), just before CDing out of
963 the work directory. This should deliver products to their
964 destinations. The default implementation raises
965 NotImplementedError. This MUST be overridden in subclasses."""
966 raise NotImplementedError(
'A WRFTaskBase subclass did not '
967 'implement deliver_products')
969 """!generates the wrf namelist.input file
971 Runs set_ij_start (swcorner_dynamic) to generate the i & j
972 start locations for domain 2, then generates the namelist.
973 @param filename the namelist.input filename
974 @param logger the logging.Logger to log messages (optional)"""
975 if logger
is None: logger=self.
log()
976 domlat=self.conf.getfloat(
'config',
'domlat')
977 domlon=self.conf.getfloat(
'config',
'domlon')
978 s=self.
wrf().swcorner_dynamic(self.
getexe(
'swcorner_dynamic'),
980 with open(filename,
'wt')
as nlin:
983 """!Returns the WRFSimulation object used by this task."""
987 """!Returns the WRFSimulation object used by this task.
989 This property has the same effect as self.wrf(), but this is a
990 property instead. Hence, you can type self.sim instead of
1002 """!a HWRFTask subclass that runs real_nmm
1004 This subclass of WRFTaskBase runs the real_nmm to generate inputs
1005 for the provided WRFSimulation."""
1007 """!prepares products for deliver_products() and products()
1009 Generates produtil.datastore.FileProduct objects for the files
1010 that should be delivered in deliver_products() and listed by
1015 location=os.path.join(self.
outdir,
'wrfbdy_d01'))
1017 location=os.path.join(self.
outdir,
'rsl.out.0000'))
1019 location=os.path.join(self.
outdir,
'fort.65'))
1021 """!adds a prep_hybrid data source
1023 Modifies the simulation namelist generator so that it
1024 specifies use_prep_hybrid=T. Calls
1025 WRFTaskBase.add_prep_hybrid(p) to add the specified object as
1026 a prep_hybrid data source.
1028 @param p the src parameter to Prep2WRF.__init__()
1031 self.sim.nl.nl_set(
'domains',
'use_prep_hybrid',
True)
1034 """!creates a WRFSimulation object for this class
1036 Creates a copy of the specified WRFSimulation wrf. The copy
1037 has a simulation length of one minute or one physics timestep,
1038 whichever is larger. Also, it produces wrfanl files for all
1040 @param wrf the wrf argument to __init__"""
1045 """!returns True if all metgrid files are needed as input to
1049 """!deletes the wrfinput_d01 and wrfbdy_d01."""
1051 for f
in [
'wrfinput_d01',
'wrfbdy_d01']:
1053 remove_file(f,logger)
1054 except(EnvironmentError)
as e:
1055 logger.warning(
'%s: cannot remove, but continuing anyway. '
1056 'Error was: %s'%(f,str(e)),exc_info=
True)
1057 def run_exe(self,exename='real_nmm',sleeptime=30):
1058 """!runs the real_nmm program
1060 Tries several times to run the real_nmm. This is a workaround
1061 for problems encountered on Zeus when running the experimental
1062 9:3:1 HWRF. The real_nmm would exit for no apparent reason
1063 about 50% of the time. Eventually that was tracked down to a
1064 memory error caused by NetCDF, which forced us to use intio
1065 for some files, NetCDF for others, and PNetCDF for yet more
1068 @param exename the name of the real_nmm executable in the
1069 configuration file "exe" section.
1070 @param sleeptime passed to produtil.run.run to determine how
1071 often to check the child process."""
1073 logger.info(
'real section is '+repr(self.
section))
1074 maxtries=self.
confint(
'max_tries',-999)
1075 logger.info(
'real max_tries is '+repr(maxtries))
1076 maxtries=max(1,maxtries)
1077 logger.info(
'after max(1,...), real max_tries is '+str(maxtries))
1079 logger.info(
'Will try to run real %d times'%(maxtries,))
1080 for trynum
in xrange(maxtries):
1082 super(RealNMM,self).
run_exe(exename,
'prep' in
1083 self.
inputs,sleeptime=sleeptime)
1084 logger.info(
'Real succeeded. Hurrah!')
1086 except(EnvironmentError,RealNMMError,ExitStatusException)
as e:
1087 if(trynum+1<maxtries):
1089 'Real failed %d time(s). Will retry after %d '
1090 'second sleep. Error: %s'%(
1091 trynum+1,sleeptime,str(e)),exc_info=
True)
1092 time.sleep(sleeptime)
1094 'Returned from sleeping. Will now retry real.')
1097 'Real failed %d time(s). Aborting. Error: %s'
1098 %(maxtries,str(e)),exc_info=
True)
1101 """!returns the wrfinput file regardless of the time or
1105 """!returns the wrfinput file for a specified time and domain
1106 @param atime the time
1107 @param domain the WRF domain (an integer, string name or WRFDomain)
1108 @return the wrfinput file for the specified time and
1109 domain. Returns None if that time and domain are not
1111 if domain
and domain!=self.
wrf().get_moad():
return None
1112 if atime
is not None and \
1113 within_dt_epsilon(atime,self.
wrf().simstart(),self.
1118 """!returns the wrfbdy file for a specified time and domain
1119 @param atime the time
1120 @param domain the WRF domain (an integer, string name or WRFDomain)
1121 @return the wrfbdy file for the specified time and
1122 domain. Returns None if that time and domain are not
1124 if domain
and domain!=self.
wrf().get_moad():
return None
1125 if atime
is not None and \
1126 within_dt_epsilon(atime,self.
wrf().simstart(),
1131 """!returns the coupler fort.65 file for a specified time and domain
1132 @param atime the time
1133 @param domain the WRF domain (an integer, string name or WRFDomain)
1134 @return the fort.65 file for the specified time and
1135 domain. Returns None if that time and domain are not
1137 if domain
and domain!=self.
wrf().get_moad():
return None
1138 if atime
is not None and \
1139 within_dt_epsilon(atime,self.
wrf().simstart(),
1144 """!delivers products that were identified by make_products()"""
1146 self.prod_wrfinput.location))
1147 self.prod_wrfinput.deliver(frominfo=
'wrfinput_d01',keep=
False)
1148 self.prod_wrfbdy.deliver(frominfo=
'wrfbdy_d01',keep=
False)
1149 self.prod_log.deliver(frominfo=
'rsl.out.0000',keep=
False)
1150 self.prod_fort65.deliver(frominfo=
'fort.65',keep=
False)
1152 """!iterates over all products
1153 Iterates over all of this Task's products
1154 (produtil.datastore.FileProduct objects) created by
1161 """!Writes the namelist for real.
1163 Writes the namelist.input file for a call to real_nmm. This
1164 does the same as its parent class's implementation, except
1165 that the num_metgrid_levels is also overridden to match
1166 whatever WPS actually created (which may not match the
1168 @param filename the name of the namelist.input file
1169 @param logger the logging.Logger object"""
1170 if logger
is None: logger=self.
log()
1171 exepath=self.
getexe(
'hwrf_metgrid_levels')
1173 for x
in glob.glob(
'met_nmm.d01*.nc'):
1175 self.
wrf().set_metgrid_levels_from(exepath,metfile,logger)
1179 'Could not find a met_nmm.d01*.nc file for running '
1180 'hwrf_metgrid_levels.')
1204 anl_fudge_factor=fractions.Fraction(2,3)
1208 """!runs a short WRF simulation to generate wrfanl files"""
1210 """!creates the WRFSimulation object (self.wrf())
1212 Creates a copy of the specified WRFSimulation wrf. The copy
1213 has a simulation length of one minute or one physics timestep,
1214 whichever is larger. Calls the _set_wrf_proc_config to set
1215 I/O server and compute grid dimensions based on this
1216 HWRFTask's config section. Configures the WRF to produce
1217 wrfanl files for all domains.
1219 @param wrf the wrf argument to __init__
1220 @returns the new WRFSimulation"""
1226 endtime=to_fraction(MOAD.nl.nl_get(
'physics',
'nphs')) * MOAD.dt
1227 floored=max(60.0,float(math.floor(endtime*anl_fudge_factor)))
1232 wrf.set_timing(end=floored)
1233 assert(wrf.simstart()<wrf.simend())
1234 wrf.add_output(
'history',step=floored)
1238 """!the time for which analysis files are generated."""
1239 return self.
wrf().simstart()
1241 """!creates FileProduct objects for all wrfanl files."""
1243 MOAD=self.
wrf().get_moad()
1244 for domain
in self.
wrf():
1245 if domain==MOAD:
continue
1246 pname=self.
wrf().analysis_name(domain)
1247 loc=os.path.join(self.
outdir,pname)
1251 """!returns the wrfanl product for the specified domain
1252 @param domain the domain for whom the wrfanl file is requested"""
1253 if domain
is None:
return None
1254 if not domain
in self.
sim:
return None
1255 domain=self.
sim[domain]
1256 if domain.is_moad():
return None
1259 """!get a wrfanl file for a specified domain and analysis time
1261 Returns the wrfanl product for the specified domain and analysis
1262 time, if one exists, and otherwise, None.
1263 @param domain the domain (an integer, string name or WRFDomain)
1264 @param atime the analysis time """
1265 if atime!=self.
anltime:
return None
1266 if domain
not in self.
wrf():
return None
1267 d=self.
wrf()[domain]
1268 if d.is_moad() :
return None
1271 """!delivers products during a call to run()
1273 Delivers the products (FileProduct objects) that were
1274 identified by make_products."""
1276 logger.info(
'%s: make directory'%(self.
outdir,))
1278 for domain
in self.
wrf():
1279 if domain
not in self.
_products:
continue
1284 p.deliver(frominfo=os.path.basename(p.location),logger=logger,
1287 """!iterates over products
1288 Iterates over all Product objects that were identified by
1290 @param domain if provided, only this domain's Products are yielded"""
1292 if self._products.has_key(domain):
1295 for domain
in self.
wrf():
1296 if self._products.has_key(domain):
1301 """!runs a short WRF simulation to generate wrfanl
1304 This is the same as the superclass, WRFAnl, but it renames wrfanl
1305 files to "ghost" instead of "wrfanl". Also, history output is
1308 """!makes the WRFSimulation object for this class.
1310 Creates a copy of the WRFSimulation object wrf. This first
1311 calls the WRFAnl.make_wrf, and then disables the history
1313 @param wrf the wrf argument to __init__"""
1314 wrf=super(WRFGhost,self).
make_wrf(wrf)
1315 wrf.set_wrfanl_outname(
'ghost_d<domain>')
1316 wrf.add_output(
'history',step=3600*3,end=9*3600,start=3*3600)
1319 """!same as get_wrfanl()
1320 @param domain the domain of interest"""
1321 if domain
is None:
return None
1322 if not domain
in self.
sim:
return None
1323 domain=self.
sim[domain]
1324 if domain.is_moad():
return None
1344 """!runs wrfanl and generates a track file
1346 This subtask of WRFAnl modifies the time of the 1 minute forecast
1347 wrfout file from the outer domain to have the analysis time
1348 instead so that the tracker can be run to get the initial storm
1350 def __init__(self,dstore,conf,section,wrf,trakdoms=JUST_MOAD,
1351 trakname=
'trankin_d<domain>',**kwargs):
1354 Constructor for WRFAnl4Trak.
1355 @param trakdoms what domains should be used for the tracks?
1356 either JUST_MOAD or ALL_DOMS or a list of WRFDomain objects
1357 @param trakname the track output filename.
1358 @param dstore,conf,section,wrf,kwargs passed to superclass constructor"""
1361 super(WRFAnl4Trak,self).
__init__(dstore,conf,section,wrf,**kwargs)
1363 """! iterates over track input domains
1365 Iterates over domains specified by the trakdoms parameter to
1368 yield self.sim.get_moad()
1370 for domain
in self.
sim:
yield domain
1374 """!creates Product objects for deliver_products() and products()
1376 Called from constructor. This make_products adds all products
1377 produced by WRFAnl.make_products, but adds a product for the
1378 outer domain one minute forecast history stream (wrfout) file
1379 called "trackin_d<domain>". That file is intended to be used
1380 to generate the parent domain vortex information."""
1381 WRFAnl.make_products(self)
1388 idom=sim[domain].get_grid_id()
1390 trackin_name_pattern,idom,
1391 sim.simstart(),sim.get_nocolons())
1393 loc=os.path.join(self.
outdir,name)
1396 prod[
'stream']=
'history'
1400 """!modifies trakin_* files to say they're at time 0
1402 Calls retime_wrfout for all domains whose trackin_d0X files
1403 are requested. This produces the modified 1 minute forecast
1404 wrfout files that lie and say they're from the analysis time."""
1409 """!modifies a trakin file's internal timestamp
1411 If possible, modifies the stated output time in the one minute
1412 forecast trackin_d<domain> file to the analysis time. Does
1413 this for one domain. For intio files, it does not modify the
1414 file, but instead simply renames it. That is done because, at
1415 last test, the post does not actually look in an intio wrfout
1416 file for the time, so no modification is necessary.
1417 @param domain the domain whose trakin file is being modified"""
1423 logger.info(
'simend = '+repr(self.
wrf().simend())+
' = '+str(self.
wrf().simend()))
1424 infile=dom.get_output(stream,self.
wrf().simend(),logger=logger)
1425 logger.info(
'infile = '+str(infile.path()))
1426 io_form=sim.io_form_for(stream)%100
1428 logger.warning(
'%s: renaming instead of running wrfout_newtime '
1429 'since file is (probably) not NetCDF: '
1430 'io_form=%d'%(infile,io_form))
1431 os.rename(infile.path(),name)
1434 shutil.copy2(infile.path(),name)
1435 checkrun(bigexe(self.
getexe(
'wrfout_newtime'))[name,
1436 sim.simstart().strftime(
'%Y-%m-%d_%H:%M:%S')])
1437 except Exception
as e:
1438 logger.error(
'%s (from %s): unable to modify time in NetCDF '
1439 'file: %s'%(infile.path(), name, str(e)))
1443 """!delivers products
1445 Delivers all products (FileProduct objects) created by
1446 make_products, including the new trackin_d<domain> added by
1447 this subclass, and all products added by the superclass
1450 for d,p
in self.trackin_prod.iteritems():
1452 logger=self.
log(),keep=
False)
1455 """!returns a trakin (trackin) Product
1457 Returns a trakin (trackin) Product. If a domain is specified,
1458 returns the trackin file for that domain. Otherwise, the
1459 MOAD (outermost domain) is assumed.
1460 @param domain None, or the domain of interest
1461 @return a Product for the trakin file for the specified domain"""
1463 domain=self.sim.get_moad()
1468 def products(self,domain=None,domains=None,time=None,stream=None):
1469 """!iterates over all products from this task.
1471 Iterates over all products from this task. This class adds
1472 the trackin_d0* files.
1474 @param domain the domain of interest
1475 @param domains a list of domains of interest
1476 @param time the analysis time
1477 @param stream the output stream"""
1478 if not domains
and not time
and not stream:
1479 for p
in WRFAnl.products(self,domain=domain):
yield p
1481 if stream
and stream!=
'history':
return
1482 if time
and time!=self.
wrf().simstart():
return
1484 for d,p
in self.trackin_prod.iteritems():
1486 if domains
and simd
not in domains:
continue
1487 if domain
is not None and domain!=simd:
continue
1499 """!runs wrf to generate ghost (wrfanl) and wrfout files
1501 This class generates wrfghost files, and wrfout files, for each
1502 domain. The wrfout files happen at the end of a roughly one
1503 minute forecast so that they can be correctly post-processed.
1504 However, their internal timestamp has been changed to be for the
1505 analysis time. This class derives from WRFAnl4Trak instead of
1506 WRFGhost to reuse the wrfout renamer functionality, but it may be
1507 used in place of a WRFGhost object.
1509 Note that a wrf ghost file is a wrfanl file. There is no
1510 difference whatsoever, except in the nomenclature in the HWRF
1512 def __init__(self,dstore,conf,section,wrf,trakdoms=ALL_DOMS,
1513 trakname=
'ghout_d<domain>',**kwargs):
1516 Creates a WRFGhostForPost, passing all arguments to
1517 WRFAnl4Trak.__init__()"""
1518 super(WRFGhostForPost,self).
__init__(
1519 dstore,conf,section,wrf,trakdoms,trakname,**kwargs)
1521 """!creates a WRFSimulation that calls its wrfanl files "ghost"
1523 wrf=super(WRFGhostForPost,self).
make_wrf(wrf)
1524 wrf.set_wrfanl_outname(
'ghost_d<domain>')
1528 """!returns the ghost wrfout product for the specified domain."""
1529 assert(domain
is not None)
1533 """!same as get_wrfanl()
1535 This works exactly like get_wrfanl()
1536 @param domain the domain of interest"""
1537 if domain
is None:
return None
1538 if not domain
in self.
sim:
return None
1539 domain=self.
sim[domain]
1540 if domain.is_moad():
return None
1545 """!The uncoupled HWRF forecast Task.
1547 This class runs an atmosphere-only WRF run, using wrfanl files
1548 from a prior WRFAnl simulation. It encapsulates an
1549 ExternalWRFTask, which performs the actual tracking of products.
1550 This allows the post-processors and wrf copy tasks to keep track
1551 of the model's output while the model is running. That subtask
1552 shows up as ".mon" (for "monitor") relative to this task (so if
1553 this task is wrfatmos, then the external task is wrfatmos.mon)."""
1554 def __init__(self,dstore,conf,section,wrf,keeprun=True,
1555 wrfdiag_stream=
'auxhist1',**kwargs):
1556 """!WRFAtmos constructor
1558 The constructor for WRFAtmos.
1559 @param dstore the produtil.datastore.Datastore object
1560 @param conf the HWRFConfig object for configuration information
1561 @param section the section name within that HWRFConfig object
1562 @param wrf the hwrf.wrf.WRFSimulation that is to be executed
1563 @param keeprun True means the output directory should NOT be deleted
1564 @param wrfdiag_stream stream that generates wrfdiag files
1565 @param kwargs passed to the parent class constructor."""
1569 super(WRFAtmos,self).
__init__(dstore,conf,section,wrf,
1570 keeprun=keeprun,**kwargs)
1579 """!creates a WRFSimulation for an uncoupled forecast run
1581 Creates a WRFSimulation for an uncoupled forecast run. Runs
1582 the superclass WRFAtmos.make_wrf. Instructs the resulting
1583 WRFSimulation to take analysis files as input, and calls the
1584 _set_wrf_proc_config to set I/O server and compute grid
1586 @param wrf the wrf argument to __init__"""
1587 wrf=super(WRFAtmos,self).
make_wrf(wrf)
1593 """!deletes output files
1595 Deletes output files. See the
1596 hwrf.wrf.ExternalWRFTask.unrun for details."""
1599 """!runs the uncoupled WRF forecast
1601 Performs all work needed to run the program. Sets the state
1602 to produtil.datastore.UNSTARTED. Creates the work directory,
1603 CD's to it, runs the initial_prerun, link_fix,
1604 link_all_inputs, make_namelist, final_prerun, run_exe, postrun
1605 and deliver_products."""
1607 self.__ex.state=UNSTARTED
1608 super(WRFAtmos,self).
run()
1610 """!iterates over all WRF products.
1612 Iterates over all WRF products. See the
1613 hwrf.wrf.ExternalWRFTask.products() for details.
1614 @param kwargs passed to hwrf.wrf.ExternalWRFTask.products()"""
1615 for p
in self.__ex.products(**kwargs):
1618 """!iterates over all WRF products.
1620 Synonym for products(). Iterates over all WRF products. See the
1621 hwrf.wrf.ExternalWRFTask.products() for details.
1622 @param kwargs passed to hwrf.wrf.ExternalWRFTask.products()"""
1623 for p
in self.__ex.products(**kwargs):
1626 """!checks the status of the WRF simulation.
1628 Checks the status of the WRF simulation. Should only be
1629 used while the simulation is running. This is intended to be
1630 run by jobs other than the WRF job, such as the
1631 post-processors, to monitor the WRF simulation as it
1634 @param kwargs passed to hwrf.wrf.ExternalWRFTask.wrf_check()"""
1635 self.__ex.wrf_check(**kwargs)
1636 def run_exe(self,exename='wrf',not_allranks=False,runner=None,
1638 """!runs the wrf program
1640 Called from run(), this runs the wrf program after the
1641 directory is set up.
1643 @param exename sent to getexe() to get the path to wrf if
1644 runner is unspecified
1645 @param not_allranks if True, only use one MPI rank. Do not use.
1646 @param runner an alternative produtil.prog.Runner to execute.
1647 @param sleeptime seconds to sleep between checks of the wrf
1648 executables completion. The default sleeptime, if none is
1649 specified is 60 seconds rather than the usual 30."""
1650 if sleeptime
is None:
1651 sleeptime=self.
conffloat(
'sleeptime',60)
1653 exename=exename,not_allranks=not_allranks,
1654 runner=runner,sleeptime=sleeptime)
1656 """!checks the wrf state and updates it in the HWRF database file
1658 Checks the state of the WRF simulation and copies that
1659 information to self.state in the produtil.datastore.Datastore.
1660 See hwrf.wrf.ExternalWRFTask.update_state() for details."""
1661 with self.dstore.transaction()
as t:
1662 self.__ex.update_state()
1663 self.
state=self.__ex.state
1667 Has no effect. This is present only because it is a
1668 requirement of the parent class. No delivery is required
1669 because the products are all UpstreamFile objects, so the
1670 delivery state is set by the post-processors when calling the
1671 "check" function of each product.
1672 @param args,kwargs ignored"""
1675 class AnalysisCycle(WRFGhost):
1676 """!runs wrf for an analysis cycle
1678 This class is similar to a WRFGhost run, except that it runs a
1679 longer simulation (typically 1-6 hours), and provides wrfghost and
1680 wrfinput files at the end of it. It also requires wrfghost and
1681 wrfinput files as input. Hence, one can run another AnalysisCycle
1682 after this one, using the output of the previous. Note that this
1683 implementation relies on the fact that wrfghost, wrfanl and
1684 restart files are all exactly the same file format (just different
1685 times and domains)."""
1686 def __init__(self,dstore,conf,section,wrf,simlen=None,
1687 keeprun=
False,**kwargs):
1688 """!constructor for AnalysisCycle
1690 Constructor for the AnalysisCycle class.
1691 @param dstore the produtil.datastore.Datastore to use
1692 @param conf the hwrf.config.HWRFConfig that provides configuration ifnormation
1693 @param section the config section in conf
1694 @param wrf the hwrf.wrf.WRFSimulation object that is being run
1695 @param keeprun if True, the simulation temporary files are not deleted
1696 @param simlen simulation length in seconds
1697 @param kwargs passed unmodified to FcstTask.__init__()"""
1699 if simlen
is not None:
1700 simlen=to_timedelta(simlen)
1702 super(AnalysisCycle,self).
__init__(dstore,conf,section,wrf,
1708 """!called from constructor, creates Products
1710 This is called from the WRFTaskBase constructor. It creates
1711 products that will be used by deliver_products() and
1714 MOAD=self.
wrf().get_moad()
1717 for domain
in self.
wrf():
1718 pname=
'wrfinput_d%02d'%(domain.get_grid_id(),)
1719 loc=os.path.join(self.
outdir,pname)
1737 """!analysis cycle end time
1739 The time at end of the analysis cycle, at which the output
1740 wrfanl and wrfinput files are available."""
1741 return self.
wrf().simend()
1745 """!analysis cycle start time
1747 The time at beginning of the analysis cycle, at which the
1748 input wrfanl and wrfinput files must be provided."""
1749 return self.
wrf().simstart()
1753 """!simulation length
1755 The requested length of the simulation. Note that this may
1756 not be the same as the actual simulation length
1757 (self.anlouttime-self.anlintime) due to the model timestep."""
1763 """!creates the WRFSimulation object
1765 Called from the constructor. Generates and returns a new
1766 WRFSimulation object that meets these requirements:
1767 1. Read in wrfanl and wrfinput files
1768 2. Output a restart and wrfinput file after self.simlen time
1769 3. Disable history and auxhist 1-3.
1770 @param wrf the wrf argument to __init__"""
1779 simlen=to_timedelta(self.
simlen)
1782 wrf.set_timing(end=simlen)
1785 wrf.add_output(
'restart',step=simlen,start=simlen,end=simlen)
1790 wrf.add_output(
'inputout',step=simlen,start=simlen,end=simlen,
1791 outname=
'wrfinputout_d<domain>')
1792 input_outname=wrf.nl.nl_get(
'time_control',
'input_outname')
1793 assert(input_outname==
'wrfinputout_d<domain>')
1796 assert(domain.has_output(
'inputout'))
1797 assert(domain.has_output(
'restart'))
1801 domain.nl.nl_set(
'physics',
'ntrack',1000)
1808 if not wrf.has_output(
'history'):
1809 wrf.add_output(
'history',start=simlen*2,step=simlen,end=simlen*3)
1810 for stream
in [
'auxhist1',
'auxhist2',
'auxhist3',
'history' ]:
1811 io_form=wrf.io_form_for(stream)
1813 if domain.has_output(stream):
1814 domain.hide_output(stream)
1818 """!the wrfinput_d01 output Product for the specified time and domain
1820 @return the wrfinput output Product for the specified time and
1821 domain or None if that time and domain are not valid.
1823 @param atime the time
1824 @param domain the domain (integer, string name or WRFDomain)"""
1825 if domain
and domain!=self.
wrf().get_moad():
return None
1826 if atime
is not None and \
1831 """!the wrfinput_d01 output product
1833 @returns the output wrfinput Product"""
1836 """!delivers products from temporary areas
1838 Copies output products from temporary areas to long-term
1839 storage locations."""
1840 MOAD=self.
wrf().get_moad()
1844 logger.info(
'Deliver products: rst and wrfinput at time %s'%(repr(when),))
1847 for domain
in self.
wrf():
1848 pname=
'wrfinput_d%02d'%(domain.get_grid_id(),)
1849 loc=os.path.join(self.
outdir,pname)
1851 fromloc=domain.get_output(
'inputout',when,logger=logger).path()
1852 logger.info(
'Deliver moad %s from %s'%(str(domain),fromloc))
1853 self.prod_wrfinput.deliver(
1854 frominfo=fromloc,logger=logger,keep=
False)
1856 fromloc=domain.get_output(
'restart',when,logger=logger).path()
1857 logger.info(
'Deliver nest %s from %s'%(str(domain),fromloc))
1859 frominfo=fromloc,logger=logger,keep=
False)
1860 for out
in domain.get_outputs(
'history'):
1862 logger.info(
'Deliver nest %s from %s'%(str(domain),fromloc))
1863 toloc=os.path.join(self.
outdir,fromloc)
1864 logger.info(
'Deliver nest %s to %s'%(str(domain),toloc))
1866 fromloc,toloc,keep=
True,logger=logger)
1868 """Converts a WRFOutput to a Product.
1870 Returns a Product for a WRFOutput. The Product is cached in
1871 self.__prodcache and as_product() will return that cached
1872 value if one is available. Otherwise, a new one is created.
1873 Call clear_cached_products() to clear the cache.."""
1874 if self.__prodcache.has_key(wrfout):
1878 outdir=self[
'outdir']
1879 assert(outdir
is not None)
1880 loc=os.path.join(outdir,os.path.basename(wrfout.path()))
1881 with self.dstore.transaction()
as t:
1883 prodname=rel,location=loc)
1884 uf[
'stream']=wrfout.stream()
1886 if relocate: uf.location=loc
1889 def products(self,domain=None,domains=None,stream=None,time=None,relocate=False):
1890 """!iterates over all selected products
1892 Iterates over all products for the specified domain, or all
1893 products if the domain is unspecified or None.
1894 @param domain the requested domain (an integer, string name or WRFDomain)
1895 @param domains a list of domains of interest
1896 @param time the analysis time
1897 @param stream the output stream"""
1898 if domains
is not None:
1901 for out
in d.get_all_outputs(time):
1904 for out
in d.get_outputs(stream):
1907 yield self.
as_product(d.get_output(stream,time),
1912 elif domain==self.
wrf().get_moad():
1915 elif domain
is None:
1917 for p
in super(AnalysisCycle,self).
products(domain):
def initial_prerun(self)
deletes the wrfinput_d01 and wrfbdy_d01.
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
def get_inputs(self, logger, atime, domain, wrfanl='wrfanl', just_check=False, kwargs)
Links wrfanl files for the specified domain and time.
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
Create namelists, monitor wrf simulations, generate filenames.
Links real_nmm fort.65 files the current directory for a wrf run.
def wrfanl_at_time(self, atime, domain)
get a wrfanl file for a specified domain and analysis time
def run(self)
runs the uncoupled WRF forecast
def products(self)
iterates over all products Iterates over all of this Task's products (produtil.datastore.FileProduct objects) created by make_products()
def simlen(self)
simulation length
prod_wrfinput
the wrfinput output file at the end of the simulation
def has_input(self, typename)
is there any input data of this type to this task?
abstract base class of anything that runs or prepares input for a forecast model
def link_input(self, typenames, just_check=False, kwargs)
link or check for inputs
def make_wrf(self, wrf)
creates a WRFSimulation object for this class
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
def get_inputs(self, logger, atime, domain, just_check=False, kwargs)
Links coupler input data for the specified domain and time.
def deliver_products(self)
delivers products during a call to run()
def wrfinput_at_time
the wrfinput_d01 output Product for the specified time and domain
def deliver_products(self)
delivers products to their destinations
runs wrf to generate ghost (wrfanl) and wrfout files
taskname
Read-only property: the name of this task.
def get_inputs(self, logger, atime, domain, just_check=False, kwargs)
Links a wrfbdy file for the specified domain and time.
def add_wrfinput(self, r)
adds a wrfinput_d01 input source
A subclass of Product that represents file delivery.
def need_all_metgrid(self)
Returns True if all metgrid files are needed as input to this Task.
The base class of tasks run by the HWRF system.
def get_wrfanl(self, domain)
returns the wrfanl product for the specified domain
conf
This HWRFTask's hwrf.config.HWRFConfig object.
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
def anlintime(self)
analysis cycle start time
a HWRFTask subclass that runs real_nmm
def get_trackin
returns a trakin (trackin) Product
storminfo
The hwrf.storminfo.StormInfo describing the vitals information for the storm processed by this HWRFTa...
def add_prep_hybrid(self, p)
adds a prep_hybrid data source
base class of classes that run wrf or real_nmm
def retime_wrfout(self, domain)
modifies a trakin file's internal timestamp
def check_input(self, typenames, kwargs)
check if input data is available
Links Geogrid data to the current directory for a wrf or real_nmm run.
def _set_wrf_proc_config
sets nproc_x, nproc_y and I/O server settings
runs a short WRF simulation to generate wrfanl files
def get_ghout(self, domain)
returns the ghost wrfout product for the specified domain.
runs wrfanl and generates a track file
__simlen
The simulation length as a datetime.timedelta.
def wrfinput_at_time
returns the wrfinput file for a specified time and domain
domain
the domain for which this object provides wrfanl files, or None if it provides wrfanl files for all d...
low-level wrf implementation, underlying hwrf.wrf
trackin_name
mapping from WRFDomain to product name for trakin files
trackin_prod
mapping from WRFDomain to product for trakin files
_wrfdiag_stream
WRFAtmos constructor.
def run_exe
runs the wrf program
def __init__(self, dstore, conf, section, wrf, trakdoms=JUST_MOAD, trakname='trankin_d< domain >', kwargs)
constructor
def final_prerun(self)
last step before running executable
def check_all_inputs(self)
Checks to see if all needed input is available.
def confbool
Alias for self.conf.getbool for section self.section.
def anlouttime(self)
analysis cycle end time
section
The confsection in self.section for this HWRFTask (read-only)
Links prep_hybrid files to the current directory for a real_nmm run.
Base class of tasks run by HWRF.
A shell-like syntax for running serial, MPI and OpenMP programs.
prod_log
the rsl.out.0000 output file product
prod_wrfbdy
the wrfbdy output product
def wrf(self)
Returns the WRFSimulation object used by this task.
def __init__(self, dstore, conf, section, wrf, keeprun=True, wrfdiag_stream='auxhist1', kwargs)
WRFAtmos constructor.
def products
iterates over all products from this task.
A piece of data produced by a Task.
outdir
The directory in which this task should deliver its final output.
def update_state(self)
checks the wrf state and updates it in the HWRF database file
def sim(self)
Returns the WRFSimulation object used by this task.
def deliver_products(self)
delivers products that were identified by make_products()
def add_geogrid(self, g)
adds a geogrid input source
def make_products(self)
called from constructor, creates Products
Stores products and tasks in an sqlite3 database file.
The uncoupled HWRF forecast Task.
This subclass of TempDir takes a directory name, instead of generating one automatically.
def makedirs
Make a directory tree, working around filesystem bugs.
def get_inputs(self, logger, domain, just_check=False, kwargs)
Links geogrid data if any is available for the specified domain.
def add_real(self, r)
add a fort.65, wrfinput_d01 and wrfbdy_d01 input source
Time manipulation and other numerical routines.
def make_wrf(self, wrf)
creates the WRFSimulation object
def link_all_inputs
Links all inputs provided by the various add_* member functions.
def postrun(self)
modifies trakin_* files to say they're at time 0
def add_prep_hybrid(self, p)
adds a prep_hybrid input source
def products(self, kwargs)
iterates over all WRF products.
def add_fort65(self, r)
adds a coupler fort.65 input source
workdir
The directory in which this task should be run.
def confint
Alias for self.conf.getint for section self.section.
def run(self)
run the wrf or real_nmm
def __init__(self, dstore, conf, section, wrf, trakdoms=ALL_DOMS, trakname='ghout_d< domain >', kwargs)
constructor
def fort65_at_time
returns the coupler fort.65 file for a specified time and domain
def conffloat
Alias for self.conf.getfloat for section self.section.
def __init__(self, dstore, conf, section, wrf, keeprun=True, kwargs)
constructor
def wrfbdy_at_time
returns the wrfbdy file for a specified time and domain
def log
Obtain a logging domain.
monitors a running wrf simulation
def link_fix(self)
links or copies fixed data files
def make_wrf(self, wrf)
creates a WRFSimulation that calls its wrfanl files "ghost" files instead.
def get_wrfinput(self)
the wrfinput_d01 output product
def make_wrf(self, wrf)
creates a WRFSimulation for an uncoupled forecast run
def get_ghost(self, domain)
same as get_wrfanl()
def anltime(self)
the time for which analysis files are generated.
inputs
a mapping of typename to a list of input objects
def products
iterates over products Iterates over all Product objects that were identified by make_products.
def unrun(self)
deletes output files
def track_domains(self)
iterates over track input domains
def run_exe
runs the executable
def deliver_products(self)
delivers products from temporary areas
def use_prep_hybrid(self)
returns True if prep_hybrid was requested, and False otherwise.
Links Metgrid data to the current directory for a wrf or real_nmm run.
def parse_wrf_outname(outname, grid_id, date, nocolons)
generates wrf filenames from templates like construct_filename
def initial_prerun(self)
called immediately after cding to a scrub directory
def add_input(self, typename, inobj)
add input of a specified type
def __init__(self, dstore, conf, section, outdir=None, taskname=None, kwargs)
Creates a FcstTask object.
def make_namelist
generates the wrf namelist.input file
def make_wrf(self, wrf)
creates the WRFSimulation object (self.wrf())
Exceptions raised by the hwrf package.
prod_wrfinput
the wrfinput output product
def confstr
Alias for self.conf.getstr for section self.section.
def make_namelist
Writes the namelist for real.
def __init__(self, src, domain)
creates a WRFAnl2WRF for the specified source and domain
def get_wrfinput(self)
returns the wrfinput file regardless of the time or domain
runs a short WRF simulation to generate wrfanl files named "ghost"
def run_exe
runs the real_nmm program
def __init__(self, dstore, conf, section, wrf, simlen=None, keeprun=False, kwargs)
runs wrf for an analysis cycle
def postrun(self)
called just after run_exe() succeeds.
def deliver_products(self, args, kwargs)
does nothing
def add_wrfanl(self, r, domain)
add a wrfanl input source for a specific domain
def wrf_check(self, kwargs)
checks the status of the WRF simulation.
def make_wrf(self, wrf)
makes the WRFSimulation object for this class.
keeprun
if False, temporary files are deleted, otherwise they're kept
def products
iterates over all selected products
state
runs the uncoupled WRF forecast
def get_inputs(self, logger, times, just_check=False, kwargs)
Links prep_hybrid output files for the specified time index.
prod_fort65
the output product for the coupler fort.65 input file
def add_wrfbdy(self, r)
adds a wrfbdy_d01 input source
def add_metgrid(self, m)
adds a metgrid input source
def exproducts(self, kwargs)
iterates over all WRF products.
Links wrfanl or ghost files the current directory for a wrf run.
def get_ghost(self, domain)
same as get_wrfanl()
def make_wrf(self, wrf)
creates a WRFSimulation object for this class
def make_products(self)
creates FileProduct objects for all wrfanl files.
Represents a Product created by an external workflow.
Links real_nmm wrfbdy_d01 files the current directory for a wrf run.
def deliver_products(self)
delivers products
dt_epsilon
epsilon value for time equality comparisons
def get_inputs(self, logger, ftime, just_check=False, kwargs)
Links metgrid data if any is available for the specified time.
def make_products(self)
called from constructor, creates Products
def need_all_metgrid(self)
returns True if all metgrid files are needed as input to this Task
def make_products(self)
prepares products for deliver_products() and products()
def make_products(self)
creates Product objects for deliver_products() and products()