HWRF  trunk@4391
fcsttask.py
1 """!Runs the real_nmm or wrf executables.
2 
3 This module, contrary to its name, implements HWRFTask classes to
4 run the real_nmm and WRF, including tasks to make the the wrfanl and
5 ghost files. The uncoupled forecast is found in the WRFAtmos class.
6 The coupled forecast is not in this module: it is hwrf.mpipomtc
7 instead.
8 
9 All classes you will want to use derive from WRFTaskBase and FcstTask,
10 classes that exist solely to simplify later code. These are the
11 classes you will want to use:
12 
13 * RealNMM - to run the real_nmm program
14 * WRFAnl, WRFAnl4Trak - 1 minute simulation to generate wrfanl files
15 * WRFGhost, WRFGhostForPost - 1 minute simulation to generate wrfghost files
16 * WRFAtmos - full-length uncoupled forecast
17 * AnalysisCycle - short (1-6 hour) forecast that outputs wrfinput and wrfghost files, suitable for input to another AnalysisCycle
18 * hwrf.mpipomtc.WRFCoupledPOM - coupled WRF+POM forecast, in the hwrf.mpipomtc module
19 
20 All of these classes take input from one another, or from other
21 sources. The abstract base class FcstTask keeps track of those
22 through its FcstTask.add_input() function. The WRFTaskBase class
23 provides simple functions to call add_input for you. Those inputs are
24 stored in objects of class Input2Fcst and its subclasses:
25 
26 * Geog2WRF (WRFTaskBase.add_geogrid()) - geogrid data from hwrf.wps.Geogrid
27 * Met2WRF (WRFTaskBase.add_metgrid()) - metgrid data from hwrf.wps.Metgrid
28 * WRFInput2WRF (WRFTaskBase.add_wrfinput()) - wrfinput data from RealNMM, AnalysisCycle, hwrf.relocate.Merge or hwrf.ensda.FromPriorCycle
29 * Fort652WRF (WRFTaskBase.add_fort65()) - coupler fort.65 file from RealNMM
30 * WRFAnl2WRF (WRFTaskBase.add_wrfanl()) - wrf analysis (time 0 restart) files from WRFAnl, WRFAnl4Trak, WRFGhost, WRFGhostForPost, AnalysisCycle, hwrf.relocate.Merge or hwrf.relocate.RelocationTask (and its subclasses)
31 * WRFBdy2WRF (WRFTaskBase.add_wrfbdy()) - wrf boundary (wrfbdy) files from RealNMM
32 * Prep2WRF (WRFTaskBase.add_prep_hybrid()) - prep_hybrid files from hwrf.prep.PrepHybrid
33 """
34 
35 ## @var __all__
36 # List of variables imported by "from hwrf.fcsttask import *"
37 __all__=['FcstTask']
38 
39 import os, collections, glob, re,math, fractions, shutil, sys, time,logging
43 
44 from produtil.cd import NamedDir
45 from produtil.fileop import realcwd, deliver_file, make_symlink, \
46  check_last_lines, isnonempty, remove_file
47 from produtil.datastore import FileProduct, UpstreamFile, UNSTARTED
48 from produtil.run import checkrun, mpi, mpirun, bigexe, ExitStatusException,\
49  run, batchexe, runsync
50 from hwrf.exceptions import WRFGeogridMissing, WRFMetgridMissing, \
51  WRFPrepMissing, WRFInputMissing, WRFBdyMissing, WRFAnlMissing, \
52  ForecastInputError, SetNestFailed, RealNMMError
53 from hwrf.numerics import to_fraction, within_dt_epsilon, to_datetime_rel, \
54  to_timedelta
55 from hwrf.wrf import ExternalWRFTask
56 
57 ########################################################################
58 
59 class Input2Fcst(object):
60  """!abstract base class of wrf/real_nmm input providers
61 
62  This is the abstract base class of anything that gets, or creates,
63  input files for a WRF simulation without running another Task.
64  For example, something that copies the geogrid output would be a
65  subclass of Input2Fcst"""
66  def __init__(self,src):
67  """!creates a new Input2Fcst with the specified src.
68 
69  The src is implementation-dependent and can be interpreted as
70  desired by subclasses. If no "src" is needed, None is
71  acceptable."""
72  self.src=src
73  def get_inputs(self,just_check=False,**kwargs):
74  """!copies or links input files.
75 
76  This function is unimplemented: subclasses are expected to
77  replace it. If just_check=True, checks to see if inputs are
78  available, returning True if they are and False otherwise. If
79  just_check=False, then the files are actually copied or linked.
80 
81  This default implementation does nothing and returns True.
82 
83  @return True on success, False otherwise"""
84  return True
85  def link_product(self,product,excclass,logger,target=None,
86  just_check=False):
87  """!helper function that links data
88 
89  If just_check=True, checks to see if data is available,
90  returning True if it is, and False otherwise. If just_check
91  is False, then the file is linked from the given product to
92  the target location (basename(product.location) if no target
93  is provided). If the product is not yet available or has no
94  location, then the given exception class excclass is raised.
95 
96  @return True on success, False otherwise
97  @param product the produtil.datastore.Product to link
98  @param excclass the class of Exception to raise on error
99  @param logger the logging.Logger to use for logging
100  @param just_check if True, just check for data, but link nothing
101  @param target the name of the link
102  """
103  if logger is None:
104  logger=logging.getLogger('hwrf.fcsttask')
105  assert(isinstance(product,produtil.datastore.Product))
106  (L,A)=(product.location,product.available)
107  if not A:
108  product.check()
109  (L,A)=(product.location,product.available)
110  if L and A:
111  if not target:
112  target=product.meta('basename','')
113  if not target:
114  target=os.path.basename(L)
115  if just_check:
116  if isnonempty(L):
117  return True
118  elif logger is not None:
119  msg='%s: file is empty or non-existent'%(L,)
120  logger.warning(msg)
121  return False
122  make_symlink(L, target, logger=logger,force=True)
123  return True
124  msg='%s: unavailable (available=%s location=%s)'%(
125  str(product.did),repr(A),repr(L))
126  if logger is not None: logger.warning(msg)
127  if just_check: return False
128  raise excclass(msg)
129  ##@var src
130  # the implementation-defined source, used by subclasses
131  # @protected
132 
133 ########################################################################
135  """!Links Geogrid data to the current directory for a wrf or
136  real_nmm run.
137 
138  This class links geogrid data to the current directory for a wrf
139  or real_nmm run, taken from some source (src) object sent to
140  __init__. That src must have a geodat(domain) function that can
141  provide a Product with geogrid output data (such as
142  hwrf.wps.Geogrid.geodat())"""
143  def get_inputs(self,logger,domain,just_check=False,**kwargs):
144  """!Links geogrid data if any is available for the specified domain.
145 
146  If just_check=True, checks for geogrid input data. Otherwise,
147  links the data using Input2Fcst.link_product. Raises
148  WRFGeogridMissing if the data is missing."""
149  if self.src is not None and domain is not None:
150  p=self.src.geodat(domain)
151  if p:
152  if not p.available: p.check()
153  return self.link_product(p,WRFGeogridMissing,logger,
154  just_check=just_check)
155  return False
156 
157 ########################################################################
159  """!Links Metgrid data to the current directory for a wrf or
160  real_nmm run.
161 
162  This class links metgrid.exe output data to the current directory
163  for a wrf or real_nmm execution. The output data is taken from
164  some source object, sent to the src argument of __init__. That
165  object must have a met_at_time(ftime) function that can provide a
166  Product for metgrid data at a specific forecast time (such as
167  hwrf.wps.Metgrid.met_at_time())"""
168  def get_inputs(self,logger,ftime,just_check=False,**kwargs):
169  """!Links metgrid data if any is available for the specified time.
170 
171  If just_check=True, checks for metgrid input data. Otherwise,
172  links the data using Input2Fcst.link_product. Raises
173  WRFMetgridMissing if the data is missing."""
174  if self.src is not None and ftime is not None:
175  p=self.src.met_at_time(ftime)
176  if p:
177  if not p.available: p.check()
178  return self.link_product(p,WRFMetgridMissing,logger,
179  just_check=just_check)
180  return False
181 
182 ########################################################################
184  """!Links real_nmm wrfinput_d01 files the current directory for a
185  wrf run.
186 
187  This class links real_nmm wrfinput_d01 files the current directory
188  for a wrf run. It gets those files from a source object specified
189  in the src argument to __init__. The src must have a
190  wrfinput_at_time(ftime) function that can provide a Product for
191  metgrid data at a specific forecast time (such as
192  hwrf.fcsttask.RealNMM.wrfinput_at_time())."""
193  def get_inputs(self,logger,atime,domain,just_check=False,**kwargs):
194  """!Links wrfinput_d01 files.
195 
196  If just_check=True, checks for wrfinput_d01 data. Otherwise,
197  links the data using Input2Fcst.link_product. Raises
198  WRFInputMissing if the data is missing."""
199  if self.src is not None and atime is not None and domain is not None:
200  p=self.src.wrfinput_at_time(atime,domain)
201  if p:
202  if not p.available: p.check()
203  return self.link_product(p,WRFInputMissing,logger,
204  target='wrfinput_d01',
205  just_check=just_check)
206  return False
207 
208 ########################################################################
210  """!Links real_nmm fort.65 files the current directory for a wrf run.
211 
212  This class links real_nmm fort.65 files the current directory for
213  a wrf run. The files are taken from some source object, specified
214  in the src argument to __init__. The src must have a
215  fort65_at_time(atime,domain) function that provides a Product for
216  the fort.65 file. See hwrf.fcsttask.RealNMM.fort65_at_time() for
217  an example."""
218  def get_inputs(self,logger,atime,domain,just_check=False,**kwargs):
219  """!Links coupler input data for the specified domain and time.
220 
221  If just_check=True, checks for a coupler "fort.65" input
222  product. Otherwise, links the data using
223  Input2Fcst.link_product to the filename "fort.65" in the
224  current working directory. Raises WRFInputMissing if the data
225  is missing.
226  @param atime the analysis time
227  @param domain the wrf domain (a number, string name or WRFDomain)
228  @param logger a logging.Logger for logging messages
229  @param just_check if True, just check for data, otherwise link it
230  @param kwargs ignored"""
231  if self.src is not None and atime is not None and domain is not None:
232  p=self.src.fort65_at_time(atime,domain)
233  if p:
234  if not p.available: p.check()
235  return self.link_product(p,WRFInputMissing,logger,
236  target='fort.65',
237  just_check=just_check)
238  return False
239 
240 ########################################################################
242  """!Links wrfanl or ghost files the current directory for a wrf run.
243 
244  This class links wrfanl or ghost files the current directory for a
245  wrf run. The files come from some source object, sent to the src
246  argument of __init__. That object must have a
247  wrfanl_at_time(atime,domain) function like
248  hwrf.fcsttask.WRFAnl.wrfanl_at_time()"""
249  def __init__(self,src,domain):
250  """!creates a WRFAnl2WRF for the specified source and domain"""
251  super(WRFAnl2WRF,self).__init__(src)
252  self.domain=domain
253  def get_inputs(self,logger,atime,domain,wrfanl='wrfanl',
254  just_check=False,**kwargs):
255  """!Links wrfanl files for the specified domain and time.
256 
257  If just_check=True, checks to see if there should be wrfanl
258  files for the specified domain and time. Otherwise, links the
259  data using Input2Fcst.link_product to the filename "fort.65"
260  in the current working directory. Raises WRFAnlMissing if the
261  data is missing.
262  @param atime the analysis time
263  @param domain the wrf domain (a number, string name or WRFDomain)
264  @param logger a logging.Logger for logging messages
265  @param wrfanl the beginning of the link filename. Usually this is
266  "wrfanl", "wrfghost" or "ghost".
267  @param just_check if True, just check for data, otherwise link it
268  @param kwargs ignored"""
269  if self.src is not None and atime is not None and domain is not None\
270  and domain==self.domain: # yay for run-on sentences!
271  p=self.src.wrfanl_at_time(atime,domain)
272  if p:
273  if not p.available: p.check()
275  str(wrfanl)+'_d<domain>_<date>',domain.get_grid_id(),
276  atime,domain.nocolons)
277  return self.link_product(
278  p,WRFAnlMissing,logger,target=localname,
279  just_check=just_check)
280  return False
281  ## @var domain
282  # the domain for which this object provides wrfanl files, or None
283  # if it provides wrfanl files for all domains except the MOAD
284  # @protected
285 
286 ########################################################################
288  """!Links real_nmm wrfbdy_d01 files the current directory for a wrf
289  run.
290 
291  This class links real_nmm wrfbdy_d01 files the current directory
292  for a wrf run. Those files are taken from some source object
293  specified in the src argument to __init__. The source must have a
294  wrfbdy_at_time(atime,domain) function like
295  hwrf.fcsttask.RealNMM.wrfbdy_at_time() """
296  def get_inputs(self,logger,atime,domain,just_check=False,**kwargs):
297  """!Links a wrfbdy file for the specified domain and time.
298 
299  If just_check=True, checks for a wrfbdy input product.
300  Otherwise, links the data using Input2Fcst.link_product to the
301  filename "fort.65" in the current working directory. Raises
302  WRFBdyMissing if the data is missing.
303  @param atime the analysis time
304  @param domain the wrf domain (a number, string name or WRFDomain)
305  @param logger a logging.Logger for logging messages
306  @param just_check if True, just check for data, otherwise link it
307  @param kwargs ignorex"""
308  if self.src is not None and atime is not None and domain is not None:
309  p=self.src.wrfbdy_at_time(atime,domain)
310  if p:
311  if not p.available: p.check()
312  return self.link_product(p,WRFBdyMissing,logger,
313  target='wrfbdy_d01',
314  just_check=just_check)
315  return False
316 ########################################################################
318  """!Links prep_hybrid files to the current directory for a real_nmm
319  run.
320 
321  This class links prep_hybrid files to the current directory for a
322  real_nmm run. The files must come from some source object,
323  specified in the src argument to __init__. That object must have
324  a products() function that behaves like
325  hwrf.prep.PrepHybrid.products() when called with a time= and name=
326  argument."""
327  def get_inputs(self,logger,times,just_check=False,**kwargs):
328  """!Links prep_hybrid output files for the specified time index.
329 
330  If just_check=True, checks for the input product's
331  availability. Otherwise, links the data using
332  Input2Fcst.link_product to the filename "fort.65" in the
333  current working directory. Raises WRFPrepMissing if the data
334  is missing.
335  @param times a list of integers, 0 for the initial time, 1 for the first boundary input time, and so on.
336  @param logger a logging.Logger for logging messages
337  @param just_check if True, just check for data, otherwise link it
338  @param kwargs ignored"""
339  if self.src is not None and times:
340  for i in xrange(len(times)):
341  logger.info('Look for prep at time '+str(i))
342  t=times[i]
343  what = 'init' if(i==0) else 'bdy'
344  prod=[p for p in self.src.products(time=t,name=what)]
345  if not prod:
346  if just_check: return False
347  raise WRFPrepMissing('No prep %s data for t=%s'%(
348  what,t.strftime('%Y%m%d-%H%M%S')))
349  prod=prod[0]
350  if not prod.available: prod.check()
351  if not self.link_product(prod,WRFPrepMissing,logger,
352  'fort.%03d'%(i+900),
353  just_check=just_check):
354  if just_check: return False
355  else:
356  logger.warning(
357  'When looking for prep data, src is none or times is false:'
358  'src=%s times=%s'%(repr(src),repr(times)))
359  if just_check: return False
360  return True
361 
362 ########################################################################
364  """!abstract base class of anything that runs or prepares input
365  for a forecast model
366 
367  Abstract base class of anything that runs a forecast model, or
368  prepares input to a forecast model. This should not be
369  instantiated directly. It exists just to simplify code in
370  subclasses."""
371  def __init__(self,dstore,conf,section,outdir=None,taskname=None,
372  **kwargs):
373  """!Creates a FcstTask object.
374 
375  Creates a new FcstTask object. All parameters are passed to the
376  superclass constructor, hwrf.hwrftask.HWRFTask.__init__()"""
377  assert(taskname is not None)
378  if taskname is None: taskname=section
379  super(FcstTask,self).__init__(dstore,conf,section,taskname=taskname,
380  outdir=outdir,**kwargs)
381  self.inputs=collections.defaultdict(list)
382  def has_input(self,typename):
383  """!is there any input data of this type to this task?
384 
385  Returns True if there is at least one input source for the
386  specified input type. That is, if someone called
387  add_input(typename,(something)). Returns False otherwise.
388 
389  @param typename the typename value that was sent to add_input
390  @returns True if add_input was called with that typename, or
391  False otherwise."""
392  return self.inputs.has_key(typename)
393  def add_input(self,typename,inobj):
394  """!add input of a specified type
395 
396  Adds an input of the given type typename that should be
397  provided by the given object. The object should be a subclass
398  of Input2Fcst.
399  @param typename the type of input
400  @param inobj the Input2Fcst object that will provide the input"""
401  self.inputs[typename].append(inobj)
402  return self
403  def check_input(self,typenames,**kwargs):
404  """!check if input data is available
405 
406  Checks all inputs of the given typenames to make sure
407  link_input would work if called with the same parameters.
408  Returns True if link_input should succeed, and False if it
409  would fail. This is a simple wrapper around link_input with
410  just_check=True. However, subclasses may override this to
411  perform additional checks, such as for a coupled model.
412 
413  @param typenames a list of typenames that were sent to add_input
414  @return True if all inputs are available, and False otherwise
415  @param kwargs passed to link_input()"""
416  return self.link_input(typenames,just_check=True,**kwargs)
417  def link_input(self,typenames,just_check=False,**kwargs):
418  """!link or check for inputs
419 
420  Links all inputs of types given in typenames (an iterable) by
421  calling obj.get_inputs on anything sent to self.add_input. If
422  multiple input sources are available for a given input type,
423  then only the first one that has input is used.
424 
425  Do not use the just_check option: it is part of the internal
426  implementation of check_input; if you need to just check the
427  inputs, use check_input instead. If just_check=True, then
428  nothing is linked. Instead, the routine just checks to see if
429  the inputs are available. That is the same as calling
430  check_input. However, subclasses may override check_input to
431  check additional inputs as part of a coupled model.
432 
433  @param typenames a list of typenames to check, as sent to
434  add_inputs()
435  @param kwargs passed to Input2Fcst.get_inputs()
436  @param just_check if True, just check for data, do not link.
437  Do not use this argument - it is part of the internal
438  implementation of this class. If you want to check for
439  inputs, call check_input() instead, as subclasses may
440  override that function to provide additional checks."""
441  if isinstance(typenames,basestring): typenames=( typenames, )
442  logger=self.log()
443  for typename in typenames:
444  logger.info('Look for input of type %s with kwargs=%s'
445  %(typename,repr(kwargs)))
446  if typename in self.inputs:
447  thelist=self.inputs[typename]
448  found=False
449  itry=0
450  for inputter in thelist:
451  logger.info('Check %s for input of type %s'
452  %(inputter,typename))
453  itry+=1
454  try:
455  found=(inputter.get_inputs(
456  logger,just_check=just_check,**kwargs)
457  is True )
458  if found:
459  logger.info(
460  'Found input type %s in inputter #%d (%s)'
461  %(repr(typename),itry,repr(inputter)))
462  break
463  else:
464  logger.warning(
465  'Could not get input type %s in inputter'
466  ' #%d (%s)'
467  %(repr(typename),itry,repr(inputter)))
468  except (ForecastInputError,KeyError) as e:
469  logger.warning(
470  'cannot get %s files due to exception: %s'
471  %(typename,str(e)),exc_info=True)
472  if found: break
473  if not found:
474  msg='%s: could not find input files of this type. '\
475  'Giving up.'%(typename,)
476  if just_check:
477  logger.warning(msg)
478  return False
479  else:
480  logger.error(msg)
481  raise ForecastInputError(msg)
482  return True
483  ##@var inputs
484  # a mapping of typename to a list of input objects
485  # @protected
486 
487 ########################################################################
489  """!base class of classes that run wrf or real_nmm
490 
491  This is the abstract base class of tasks that run real or WRF.
492  The purpose of this class is simply to reduce code duplication."""
493  def __init__(self,dstore,conf,section,wrf,keeprun=True,**kwargs):
494  """!constructor
495 
496  Creates a WRFTaskBase for the specified datastore, conf and
497  section. The wrf argument is a WRFSimulation. It will not be
498  used directly: instead the wrf is copied, and the copy is
499  used. If keeprun=True, then the output of the simulation,
500  after running it, will not be scrubbed. Other keyword
501  arguments are passed to the superclass FcstTask.
502 
503  @param dstore the produtil.datastore.Datastore to use
504  @param conf the hwrf.config.HWRFConfig that provides configuration ifnormation
505  @param section the config section in conf
506  @param wrf the hwrf.wrf.WRFSimulation object that is being run
507  @param keeprun if True, the simulation temporary files are not deleted
508  @param kwargs passed unmodified to FcstTask.__init__()"""
509  super(WRFTaskBase,self).__init__(dstore,conf,section,**kwargs)
510  self.__wrf=self.make_wrf(wrf)
511  self.make_products()
512  self.dt_epsilon=wrf.bdyepsilon()
513  self.keeprun=keeprun
514  def _set_wrf_proc_config(self,wrf,logger=None):
515  """!sets nproc_x, nproc_y and I/O server settings
516 
517  This is a protected member function meant to be used by the
518  make_wrf implementation in subclasses. This class does not
519  use it. It sets the WRF nest_pes_x and nest_pes_y, and sets I/O
520  server settings if desired based on config options:
521 
522  @param nest_pes_x,nest_pes_y compute grid dimensions per domain
523  @param nproc_x,nproc_y compute grid dimensions
524  @param nio_groups number of I/O server groups
525  @param nio_tasks_per_group number of servers per group
526  @param poll_servers set to True to poll I/O servers. This
527  generally decreases the number of I/O server groups needed
528  for a given WRF run.
529  """
530  if logger is None: logger=self.log()
531  assert(wrf is not None)
532  nio_groups=max(0,self.confint('nio_groups',1))
533 
534  # Multistorm - jtf
535  # nio_tasks_per_group may now specify values for each domain.
536  # One value for all domains, as before, or a value for each domain.
537  # ie. 4 or 4,4,2,4,2... or 12,12,8,8 (that is, double or n digits)
538  nio_tasks_per_group=self.confstr('nio_tasks_per_group','0').strip().strip(',').strip()
539  if ',' in nio_tasks_per_group:
540  nio_tpg_split=nio_tasks_per_group.split(',') #['4','2','4',...]
541  else: #in case space seperated.
542  nio_tpg_split=nio_tasks_per_group.split() #['4','2','4',...]
543  nio_tasks_per_group=','.join(nio_tpg_split) #'4,2,4, ...'
544 
545  total_nio_tpg=0
546  if len(nio_tpg_split) > 1:
547  for num in nio_tpg_split:
548  total_nio_tpg+=int(num)
549  else:
550  nio_tasks_per_group=max(0,self.confint('nio_tasks_per_group',0))
551  total_nio_tpg=nio_tasks_per_group
552 
553  poll_servers=self.confbool('poll_servers',True)
554 
555  if nio_groups*total_nio_tpg > 0:
556  if len(nio_tpg_split) > 1:
557  logger.debug(
558  'Using %d nio_groups. With a possible nio_tasks_per_group '
559  'domain scheme up to %s domains with poll_servers=%s'%(
560  nio_groups,repr(nio_tasks_per_group),repr(poll_servers)))
561  else:
562  logger.debug(
563  'Using %d groups of %d io tasks with poll_servers=%s'%(
564  nio_groups,nio_tasks_per_group,repr(poll_servers)))
565 
566  wrf.set_io_servers(nio_tasks_per_group,nio_groups,poll_servers)
567  else:
568  logger.debug('Not setting io_server settings.')
569 
570  # With the multistorm implementation the wrf executable added
571  # a new namelist called dm_task_split which takes precedence
572  # over nproc_x and nproc_y, where nest_pes_x and nest_pes_y
573  # is a list of values, for each domain. ie. namelist.input example
574  # &dm_task_split
575  # comm_start = 0,0,0,96,96,192,192,288,288,384,384,
576  # nest_pes_x = 12, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
577  # nest_pes_y = 16, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12,
578 
579  nproc_x=self.confint('nproc_x',-999)
580  nproc_y=self.confint('nproc_y',-999)
581 
582  multi_sids=self.conf.getstr('config','multistorm_sids','-999').strip().strip(',').strip()
583  comm_start=self.confstr('dm_task_split.comm_start','-999').strip().strip(',').strip()
584  nest_pes_x=self.confstr('dm_task_split.nest_pes_x','-999').strip().strip(',').strip()
585  nest_pes_y=self.confstr('dm_task_split.nest_pes_y','-999').strip().strip(',').strip()
586 
587  # Get the integer value of the d01 comm_start, nest_pes x and y and the number of storms
588  if nest_pes_x=='-999' and nest_pes_y=='-999' and comm_start=='-999':
589  nest_pes_x_d01=int(nest_pes_x)
590  nest_pes_y_d01=int(nest_pes_y)
591  comm_start_d01=int(comm_start)
592  else:
593  if ',' in comm_start:
594  comm_start_split=comm_start.split(',')
595  else: #in case space seperated.
596  comm_start_split=comm_start.split()
597  comm_start=','.join(comm_start_split)
598  comm_start_d01=int(comm_start_split[0])
599 
600  if ',' in nest_pes_x:
601  nest_pes_x_split=nest_pes_x.split(',')
602  else: #in case space seperated.
603  nest_pes_x_split=nest_pes_x.split()
604  nest_pes_x=','.join(nest_pes_x_split)
605  nest_pes_x_d01=int(nest_pes_x_split[0])
606 
607  if ',' in nest_pes_y:
608  nest_pes_y_split=nest_pes_y.split(',')
609  else: #in case space seperated.
610  nest_pes_y_split=nest_pes_y.split()
611  nest_pes_y=','.join(nest_pes_y_split)
612  nest_pes_y_d01=int(nest_pes_y_split[0])
613 
614  if multi_sids=='-999':
615  storms=-1
616  else:
617  if ',' in multi_sids:
618  storms=len(multi_sids.split(','))
619  else: #in case space seperated.
620  storms=len(multi_sids.split())
621 
622  # If comm_start, nest_pes_x and y are Not defined, that is -999, try nproc_x and nproc_y
623  if (nest_pes_x_d01>0 and nest_pes_y_d01>0 and comm_start>=0) \
624  or (nest_pes_x_d01==-1 and nest_pes_y_d01==-1 and comm_start_d01==-1):
625  if storms >0 and nest_pes_x_d01>0 and nest_pes_y_d01>0:
626  inest_pes_x=self.conf.getstr(
627  'wrf_%sstorm'%storms,'nest_pes_x','-999').strip().strip(',').strip()
628  inest_pes_y=self.conf.getstr(
629  'wrf_%sstorm'%storms,'nest_pes_y','-999').strip().strip(',').strip()
630  if inest_pes_x=='-999' and inest_pes_y=='-999':
631  logger.error("Trying to set nest_pes x and y for %s storms, "
632  "but no '[wrf_%sstorm]' section in conf file. "
633  "Will set dm_task_split values to -1, which will "
634  "cause wrf to decide the task mesh ."%
635  (repr(storms),repr(storms)))
636  comm_start_d01=-1
637  nest_pes_x_d01=-1
638  nest_pes_y_d01=-1
639  else:
640  nest_pes_x=inest_pes_x
641  nest_pes_y=inest_pes_y
642  if ',' in nest_pes_x:
643  nest_pes_x_split=nest_pes_x.split(',')
644  else: #in case space seperated.
645  nest_pes_x_split=nest_pes_x.split()
646  nest_pes_x=','.join(nest_pes_x_split)
647  nest_pes_x_d01=int(nest_pes_x_split[0])
648 
649  if ',' in nest_pes_y:
650  nest_pes_y_split=nest_pes_y.split(',')
651  else: #in case space seperated.
652  nest_pes_y_split=nest_pes_y.split()
653  nest_pes_y=','.join(nest_pes_y_split)
654  nest_pes_y_d01=int(nest_pes_y_split[0])
655 
656  logger.debug('Setting dm_task_split comm_start=%s, nest_pes_x=%s, nest_pes_y=%s'%
657  (repr(comm_start),repr(nest_pes_x),repr(nest_pes_y)))
658  wrf.set_dm_task_split(comm_start, nest_pes_x, nest_pes_y,
659  comm_start_d01=comm_start_d01, nest_pes_x_d01=nest_pes_x_d01,
660  nest_pes_y_d01=nest_pes_y_d01)
661  else:
662  if (nproc_x>0 and nproc_y>0) or (nproc_x==-1 and nproc_y==-1):
663  logger.debug('No dm_task_split so Setting nproc_x=%d nproc_y=%d'%(nproc_x,nproc_y))
664  wrf.set_nprocs(nproc_x,nproc_y)
665  else:
666  logger.debug('No dm_task_split and Not setting nproc_x or nproc_y (%s,%s)'%(
667  repr(nproc_x),repr(nproc_y)))
668 
669 
670  def make_wrf(self,wrf):
671  """!creates a WRFSimulation object for this class
672 
673  Given the wrf object passed to the constructor, creates a new
674  WRFSimulation that is modified for this object. For example,
675  this class may need to add output of a restart and wrfinput
676  file at the end of the simulation, or enable hourly output of
677  history files.
678  @param wrf the wrf argument to __init__"""
679  return wrf.copy()
680  def add_geogrid(self,g):
681  """!adds a geogrid input source
682 
683  Adds an input source (via self.add_input) that will provide
684  the output of WPS geogrid.exe. The given object must have a
685  geodat member function which takes a WRFDomain as its argument
686  and returns a Product to link. Returns self.
687 
688  @param g the src argument to Geog2WRF.__init__()
689  @returns self"""
690  return self.add_input('geogrid',Geog2WRF(g))
691  def add_metgrid(self,m):
692  """!adds a metgrid input source
693 
694  Adds an input source (via self.add_input) that will provide
695  the output of WPS metgrid.exe. The given object must have a
696  met_at_time function that returns a Product to link for a
697  specified forecast time. Returns self.
698 
699  @param m the src argument to Met2WRF.__init__()
700  @returns self"""
701  return self.add_input('metgrid',Met2WRF(m))
702  def add_prep_hybrid(self,p):
703  """!adds a prep_hybrid input source
704 
705  Adds an input source (via self.add_input) that will provide
706  the output of the prep_hybrid program. The given object must
707  have a products function that iterates over products for a
708  given name='bdy' or name='init' and a time=F for a given
709  forecast time F. Returns self.
710 
711  @param p the src argument to Prep2WRF.__init__()
712  @returns self"""
713  return self.add_input('prep',Prep2WRF(p))
714  @property
715  def use_prep_hybrid(self):
716  """!returns True if prep_hybrid was requested, and False
717  otherwise."""
718  return self.has_input('prep')
719  def add_wrfinput(self,r):
720  """!adds a wrfinput_d01 input source
721 
722  Adds an input source (via self.add_input) that will provide
723  the wrfinput output file from real_nmm. The given object must
724  have a wrfinput_at_time(atime,domain) function that returns a
725  Product for a given analysis time and WRFDomain object.
726  Returns self.
727 
728  @param r the src argument to WRFInput2WRF.__init__()
729  @returns self"""
730  return self.add_input('wrfinput',WRFInput2WRF(r))
731  def add_wrfbdy(self,r):
732  """!adds a wrfbdy_d01 input source
733 
734  Adds an input source (via self.add_input) that will provide
735  the wrfbdy output of a real_nmm run. The given object must
736  have a wrfbdy_at_time(atime,domain) function that returns a
737  Product to link for a specified analysis time and WRFDomain
738  object. Returns self.
739 
740  @param r the src argument to WRFBdy2WRF.__init__()
741  @returns self"""
742  return self.add_input('wrfbdy',WRFBdy2WRF(r))
743  def add_fort65(self,r):
744  """!adds a coupler fort.65 input source
745 
746  Adds an input source (via self.add_input) that will provide
747  the fort.65 output from real_nmm. given object must have a
748  fort65_at_time(atime,domain) function that returns a Product
749  to link for a specified analysis time and WRFDomain object.
750  Returns self.
751 
752  @param r the src argument to Fort652WRF.__init__ """
753  return self.add_input('fort65',Fort652WRF(r))
754  def add_real(self,r):
755  """!add a fort.65, wrfinput_d01 and wrfbdy_d01 input source
756 
757  This is a convenience function that simply passes its argument
758  to self.add_fort65, add_wrfinput and add_wrfbdy in that order.
759  Returns self.
760 
761  @param r the src argument to Fort652WRF.__init__(),
762  WRFBdy2WRF.__init__() and WRFInput2WRF.__init__()
763  @return self"""
764  self.add_fort65(r)
765  self.add_wrfinput(r)
766  return self.add_wrfbdy(r)
767  def add_wrfanl(self,r,domain):
768  """!add a wrfanl input source for a specific domain
769 
770  Adds an input source (via self.add_input) that will provide
771  the wrfanl output file from a prior run of wrf.exe. The given
772  object must have a wrfanl_at_time function that returns a
773  Product to link for a specified analysis time and domain.
774  Returns self.
775 
776  @param r the src argument to WRFAnl2WRF.__init__()
777  @param domain the domain for whom this is the wrfanl product
778  @return self"""
779  name='wrfanl-%s'%(domain.name,)
780  return self.add_input(name,WRFAnl2WRF(r,domain))
781  def make_products(self):
782  """!called from constructor, creates Products
783 
784  This is called from the WRFTaskBase constructor. Subclasses
785  should re-implement this method to generate internal
786  information about what products this class can provide, so
787  that self.products() and self.deliver_products() will function
788  properly. The default implementation does nothing."""
789  def link_fix(self):
790  """!links or copies fixed data files
791 
792  Links or copies fix files needed by WRF. Will copy if the
793  link_wrf_fix=no in this task's config section. Otherwise, the
794  files are linked."""
795  link_files=self.confbool('link_wrf_fix',True)
796  act='link' if link_files else 'copy'
797  for confitem in ('fix.eta_lookup', 'fix.track', 'fix.wrf_other'):
798  pattern=self.confstr(confitem,'')
799  logger=self.log()
800  if not pattern:
801  logger.warning(
802  '%s: no conf entry for this fix file or set of fix files'
803  %(confitem,))
804  continue
805  logger.info('Will %s WRF fix files from %s to current directory.'
806  %(act,pattern))
807  for src in glob.glob(pattern):
808  tgt=re.sub('^hwrf_','',os.path.basename(src))
809  if link_files or os.path.isdir(src):
810  make_symlink(src,tgt,logger=logger,force=True)
811  else:
812  deliver_file(src,tgt,logger=logger)
813  def check_all_inputs(self):
814  """!Checks to see if all needed input is available."""
815  return self.link_all_inputs(just_check=True)
816  def link_all_inputs(self,just_check=False):
817  """!Links all inputs provided by the various add_* member
818  functions.
819 
820  @param just_check if True, just check to see if data is
821  available. If False, actually copy."""
822  use_prep = ('prep' in self.inputs)
823  okay=True
824  if use_prep:
825  okay=okay and self.link_input(
826  'prep',times=[t for t in self.__wrf.bdytimes()],
827  just_check=just_check)
828  for domain in self.__wrf:
829  okay=okay and self.link_input(
830  'geogrid',domain=domain,just_check=just_check)
831  for ftime in self.__wrf.bdytimes():
832  okay=okay and self.link_input(
833  'metgrid',ftime=ftime,just_check=just_check)
834  if not self.need_all_metgrid() or use_prep: break
835  if 'wrfinput' in self.inputs:
836  okay=okay and self.link_input(
837  'wrfinput',domain=self.__wrf.get_moad(),
838  atime=self.__wrf.simstart(),just_check=just_check)
839  if 'wrfbdy' in self.inputs:
840  okay=okay and self.link_input(
841  'wrfbdy',domain=self.__wrf.get_moad(),
842  atime=self.__wrf.simstart(),just_check=just_check)
843  if 'fort65' in self.inputs:
844  okay=okay and self.link_input(
845  'fort65',domain=self.__wrf.get_moad(),
846  atime=self.__wrf.simstart(),just_check=just_check)
847  MOAD=self.__wrf.get_moad()
848  for domain in self.__wrf:
849  if domain!=MOAD:
850  name='wrfanl-%s'%(domain.name,)
851  if name in self.inputs:
852  okay=okay and self.link_input(
853  name,domain=domain,atime=self.__wrf.simstart(),
854  just_check=just_check)
855  return okay
856  def need_all_metgrid(self):
857  """!Returns True if all metgrid files are needed as input to
858  this Task"""
859  return False
860  def run_exe(self,exename='wrf',not_allranks=False,runner=None,
861  sleeptime=None):
862  """!runs the executable
863 
864  Runs the executable this task is responsible for running.
865  Determines if the program ran correctly. The exename is the
866  name of the argument in the [exe] section of the
867  HWRFConfig. Options:
868 
869  @param not_allranks By default (False) all ranks are used to
870  run the executable. Pass not_allranks=True to run on only
871  one rank.
872 
873  @param runner pass a produtil.prog.Runner object if desired.
874  This overrides any decision of what to run: the exename and
875  not_allranks will be ignored, and whatever is supplied in
876  runner is simply passed to produtil.run.run.
877 
878  @param sleeptime passed to produtil.run.run to determine how
879  often to check the child process. By default, the sleeptime
880  option in this task's config section is used, or if that is
881  absent, 30 seconds.
882 
883  @param exename if no runner is specified, self.getexe(exename)
884  is called to get the executable path, and it is run as an
885  MPI program."""
886  if sleeptime is None:
887  sleeptime=self.conffloat('sleeptime',30)
888  logger=self.log()
889  if runner is None:
890  exe=self.getexe(exename)
891  if not_allranks:
892  runner=mpirun(mpi(exe)) # run in one rank
893  else:
894  runner=mpirun(mpi(exe),allranks=True)
895  sync_frequently=self.confbool('sync_frequently',True)
896  if self.confbool('sync_frequently',True):
897  run(batchexe('sync'))
898  runsync()
899  stat=run(runner,logger=logger,sleeptime=sleeptime)
900  logger.info('%s: exit status %d'%(exename,stat))
901  if not check_last_lines('rsl.out.0000','SUCCESS COMPLETE',
902  logger=logger):
903  msg='%s: did not see SUCCESS COMPLETE in rsl.out.0000'%(exename,)
904  logger.error(msg)
905  raise RealNMMError(msg)
906  else:
907  logger.info('%s: SUCCESS COMPLETE in rsl.out.0000'%(exename,))
908  def final_prerun(self):
909  """! last step before running executable
910 
911  Called by self.run() just before calling run_exe. The
912  default implementation does nothing. This is intended to be
913  overridden by subclasses."""
914  def initial_prerun(self):
915  """!called immediately after cding to a scrub directory
916 
917  Called by self.run() after CDing to the new directory, but
918  before doing anything else. The default implementation does
919  nothing. This is intended to be overridden by subclasses."""
920  def run(self):
921  """!run the wrf or real_nmm
922 
923  Performs all work needed to run the program. Creates the work
924  directory, CD's to it, runs the initial_prerun(), link_fix(),
925  link_all_inputs(), make_namelist(), final_prerun(), run_exe(),
926  postrun() and deliver_products()."""
927  logger=self.log()
928  runhere=self.workdir
929  if os.path.exists(runhere):
930  logger.warning('%s: directory exists; will delete'%(runhere,))
931  assert(not os.path.samefile(self.getdir('WORKhwrf'),runhere))
932  shutil.rmtree(runhere)
933  atime=self.__wrf.simstart()
934  with NamedDir(runhere,keep=self.keeprun,logger=logger,
935  keep_on_error=True) as rundir:
936  try:
937  logger.info('%s running in directory %s'%(
938  self.taskname,realcwd()))
939  self.location=runhere
940  self.initial_prerun()
941  self.link_fix()
942  self.link_all_inputs()
943  self.make_namelist()
944  self.final_prerun()
945  self.run_exe()
946  self.postrun()
947  self.deliver_products()
948  except Exception as e:
949  logger.critical('%s failed: %s'%(self.taskname,str(e)),
950  exc_info=True)
951  raise
952  self.postmsg('%s: completed'%(self.taskname,))
953  def postrun(self):
954  """!called just after run_exe() succeeds.
955 
956  Called by self.run() just after run_exe returns successfully.
957  The default implementation does nothing; this is intended to
958  be overridden in subclasses."""
959  def deliver_products(self):
960  """!delivers products to their destinations
961 
962  Called by self.run() after postrun(), just before CDing out of
963  the work directory. This should deliver products to their
964  destinations. The default implementation raises
965  NotImplementedError. This MUST be overridden in subclasses."""
966  raise NotImplementedError('A WRFTaskBase subclass did not '
967  'implement deliver_products')
968  def make_namelist(self,filename='namelist.input',logger=None):
969  """!generates the wrf namelist.input file
970 
971  Runs set_ij_start (swcorner_dynamic) to generate the i & j
972  start locations for domain 2, then generates the namelist.
973  @param filename the namelist.input filename
974  @param logger the logging.Logger to log messages (optional)"""
975  if logger is None: logger=self.log()
976  domlat=self.conf.getfloat('config','domlat')
977  domlon=self.conf.getfloat('config','domlon')
978  s=self.wrf().swcorner_dynamic(self.getexe('swcorner_dynamic'),
979  self.storminfo, domlat,domlon,logger)
980  with open(filename,'wt') as nlin:
981  nlin.write(s)
982  def wrf(self):
983  """!Returns the WRFSimulation object used by this task."""
984  return self.__wrf
985  @property
986  def sim(self):
987  """!Returns the WRFSimulation object used by this task.
988 
989  This property has the same effect as self.wrf(), but this is a
990  property instead. Hence, you can type self.sim instead of
991  self.wrf()"""
992  return self.__wrf
993 
994  ## @var dt_epsilon
995  # epsilon value for time equality comparisons
996 
997  ## @var keeprun
998  # if False, temporary files are deleted, otherwise they're kept
999 
1000 ########################################################################
1002  """!a HWRFTask subclass that runs real_nmm
1003 
1004  This subclass of WRFTaskBase runs the real_nmm to generate inputs
1005  for the provided WRFSimulation."""
1006  def make_products(self):
1007  """!prepares products for deliver_products() and products()
1008 
1009  Generates produtil.datastore.FileProduct objects for the files
1010  that should be delivered in deliver_products() and listed by
1011  products()."""
1012  self.prod_wrfinput=FileProduct(self.dstore,'wrfinput_d01',
1013  self.taskname,location=os.path.join(self.outdir,'wrfinput_d01'))
1014  self.prod_wrfbdy=FileProduct(self.dstore,'wrfbdy_d01',self.taskname,
1015  location=os.path.join(self.outdir,'wrfbdy_d01'))
1016  self.prod_log=FileProduct(self.dstore,'rsl.out.0000',self.taskname,
1017  location=os.path.join(self.outdir,'rsl.out.0000'))
1018  self.prod_fort65=FileProduct(self.dstore,'fort.65',self.taskname,
1019  location=os.path.join(self.outdir,'fort.65'))
1020  def add_prep_hybrid(self,p):
1021  """!adds a prep_hybrid data source
1022 
1023  Modifies the simulation namelist generator so that it
1024  specifies use_prep_hybrid=T. Calls
1025  WRFTaskBase.add_prep_hybrid(p) to add the specified object as
1026  a prep_hybrid data source.
1027 
1028  @param p the src parameter to Prep2WRF.__init__()
1029  @return self"""
1030  ret=super(RealNMM,self).add_prep_hybrid(p)
1031  self.sim.nl.nl_set('domains','use_prep_hybrid',True)
1032  return ret
1033  def make_wrf(self,wrf):
1034  """!creates a WRFSimulation object for this class
1035 
1036  Creates a copy of the specified WRFSimulation wrf. The copy
1037  has a simulation length of one minute or one physics timestep,
1038  whichever is larger. Also, it produces wrfanl files for all
1039  domains.
1040  @param wrf the wrf argument to __init__"""
1041  wrf=wrf.copy()
1042  wrf.nl.nl_set('domains','use_prep_hybrid',self.use_prep_hybrid)
1043  return wrf
1044  def need_all_metgrid(self):
1045  """!returns True if all metgrid files are needed as input to
1046  this Task"""
1047  return True
1048  def initial_prerun(self):
1049  """!deletes the wrfinput_d01 and wrfbdy_d01."""
1050  logger=self.log()
1051  for f in ['wrfinput_d01','wrfbdy_d01']:
1052  try:
1053  remove_file(f,logger)
1054  except(EnvironmentError) as e:
1055  logger.warning('%s: cannot remove, but continuing anyway. '
1056  'Error was: %s'%(f,str(e)),exc_info=True)
1057  def run_exe(self,exename='real_nmm',sleeptime=30):
1058  """!runs the real_nmm program
1059 
1060  Tries several times to run the real_nmm. This is a workaround
1061  for problems encountered on Zeus when running the experimental
1062  9:3:1 HWRF. The real_nmm would exit for no apparent reason
1063  about 50% of the time. Eventually that was tracked down to a
1064  memory error caused by NetCDF, which forced us to use intio
1065  for some files, NetCDF for others, and PNetCDF for yet more
1066  files.
1067 
1068  @param exename the name of the real_nmm executable in the
1069  configuration file "exe" section.
1070  @param sleeptime passed to produtil.run.run to determine how
1071  often to check the child process."""
1072  logger=self.log()
1073  logger.info('real section is '+repr(self.section))
1074  maxtries=self.confint('max_tries',-999)
1075  logger.info('real max_tries is '+repr(maxtries))
1076  maxtries=max(1,maxtries)
1077  logger.info('after max(1,...), real max_tries is '+str(maxtries))
1078 
1079  logger.info('Will try to run real %d times'%(maxtries,))
1080  for trynum in xrange(maxtries):
1081  try:
1082  super(RealNMM,self).run_exe(exename,'prep' in
1083  self.inputs,sleeptime=sleeptime)
1084  logger.info('Real succeeded. Hurrah!')
1085  break
1086  except(EnvironmentError,RealNMMError,ExitStatusException) as e:
1087  if(trynum+1<maxtries):
1088  logger.warning(
1089  'Real failed %d time(s). Will retry after %d '
1090  'second sleep. Error: %s'%(
1091  trynum+1,sleeptime,str(e)),exc_info=True)
1092  time.sleep(sleeptime)
1093  logger.info(
1094  'Returned from sleeping. Will now retry real.')
1095  else:
1096  logger.error(
1097  'Real failed %d time(s). Aborting. Error: %s'
1098  %(maxtries,str(e)),exc_info=True)
1099  raise
1100  def get_wrfinput(self):
1101  """!returns the wrfinput file regardless of the time or
1102  domain"""
1103  return self.prod_wrfinput
1104  def wrfinput_at_time(self,atime=None,domain=None):
1105  """!returns the wrfinput file for a specified time and domain
1106  @param atime the time
1107  @param domain the WRF domain (an integer, string name or WRFDomain)
1108  @return the wrfinput file for the specified time and
1109  domain. Returns None if that time and domain are not
1110  valid."""
1111  if domain and domain!=self.wrf().get_moad(): return None
1112  if atime is not None and \
1113  within_dt_epsilon(atime,self.wrf().simstart(),self.
1114  dt_epsilon):
1115  return self.prod_wrfinput
1116  return None
1117  def wrfbdy_at_time(self,atime,domain=None):
1118  """!returns the wrfbdy file for a specified time and domain
1119  @param atime the time
1120  @param domain the WRF domain (an integer, string name or WRFDomain)
1121  @return the wrfbdy file for the specified time and
1122  domain. Returns None if that time and domain are not
1123  valid."""
1124  if domain and domain!=self.wrf().get_moad(): return None
1125  if atime is not None and \
1126  within_dt_epsilon(atime,self.wrf().simstart(),
1127  self.dt_epsilon):
1128  return self.prod_wrfbdy
1129  return None
1130  def fort65_at_time(self,atime,domain=None):
1131  """!returns the coupler fort.65 file for a specified time and domain
1132  @param atime the time
1133  @param domain the WRF domain (an integer, string name or WRFDomain)
1134  @return the fort.65 file for the specified time and
1135  domain. Returns None if that time and domain are not
1136  valid."""
1137  if domain and domain!=self.wrf().get_moad(): return None
1138  if atime is not None and \
1139  within_dt_epsilon(atime,self.wrf().simstart(),
1140  self.dt_epsilon):
1141  return self.prod_fort65
1142  return None
1143  def deliver_products(self):
1144  """!delivers products that were identified by make_products()"""
1145  produtil.fileop.makedirs(os.path.dirname(
1146  self.prod_wrfinput.location))
1147  self.prod_wrfinput.deliver(frominfo='wrfinput_d01',keep=False)
1148  self.prod_wrfbdy.deliver(frominfo='wrfbdy_d01',keep=False)
1149  self.prod_log.deliver(frominfo='rsl.out.0000',keep=False)
1150  self.prod_fort65.deliver(frominfo='fort.65',keep=False)
1151  def products(self):
1152  """!iterates over all products
1153  Iterates over all of this Task's products
1154  (produtil.datastore.FileProduct objects) created by
1155  make_products()"""
1156  yield self.prod_wrfinput
1157  yield self.prod_wrfbdy
1158  yield self.prod_log
1159  yield self.prod_fort65
1160  def make_namelist(self,filename='namelist.input',logger=None):
1161  """!Writes the namelist for real.
1162 
1163  Writes the namelist.input file for a call to real_nmm. This
1164  does the same as its parent class's implementation, except
1165  that the num_metgrid_levels is also overridden to match
1166  whatever WPS actually created (which may not match the
1167  original namelist).
1168  @param filename the name of the namelist.input file
1169  @param logger the logging.Logger object"""
1170  if logger is None: logger=self.log()
1171  exepath=self.getexe('hwrf_metgrid_levels')
1172  metfile=None
1173  for x in glob.glob('met_nmm.d01*.nc'):
1174  metfile=x
1175  self.wrf().set_metgrid_levels_from(exepath,metfile,logger)
1176  break
1177  if metfile is None:
1178  raise RealNMMError(
1179  'Could not find a met_nmm.d01*.nc file for running '
1180  'hwrf_metgrid_levels.')
1181  super(RealNMM,self).make_namelist(filename,logger)
1182 
1183  ## @var prod_wrfinput
1184  # the wrfinput output product
1185  # @protected
1186 
1187  ## @var prod_wrfbdy
1188  # the wrfbdy output product
1189  # @protected
1190 
1191  ## @var prod_log
1192  # the rsl.out.0000 output file product
1193  # @protected
1194 
1195  ## @var prod_fort65
1196  # the output product for the coupler fort.65 input file
1197  # @protected
1198 
1199 ########################################################################
1200 
1201 ## tricks wrf into outputting child domains' data
1202 # This is used by WRFAnl.make_wrf() to modify the simulation end time in
1203 # order to trick the WRF into outputting child domains' data.
1204 anl_fudge_factor=fractions.Fraction(2,3)
1205 
1206 ########################################################################
1208  """!runs a short WRF simulation to generate wrfanl files"""
1209  def make_wrf(self,wrf):
1210  """!creates the WRFSimulation object (self.wrf())
1211 
1212  Creates a copy of the specified WRFSimulation wrf. The copy
1213  has a simulation length of one minute or one physics timestep,
1214  whichever is larger. Calls the _set_wrf_proc_config to set
1215  I/O server and compute grid dimensions based on this
1216  HWRFTask's config section. Configures the WRF to produce
1217  wrfanl files for all domains.
1218 
1219  @param wrf the wrf argument to __init__
1220  @returns the new WRFSimulation"""
1221  wrf=wrf.copy()
1222  self._set_wrf_proc_config(wrf)
1223 
1224  # Determine the length of a physics timestep:
1225  MOAD=wrf.get_moad()
1226  endtime=to_fraction(MOAD.nl.nl_get('physics','nphs')) * MOAD.dt
1227  floored=max(60.0,float(math.floor(endtime*anl_fudge_factor)))
1228  assert(floored>0)
1229  # Set the simulation end time and history output frequency to
1230  # the physics timestep:
1231  wrf.analysis_out()
1232  wrf.set_timing(end=floored)
1233  assert(wrf.simstart()<wrf.simend())
1234  wrf.add_output('history',step=floored)
1235  return wrf
1236  @property
1237  def anltime(self):
1238  """!the time for which analysis files are generated."""
1239  return self.wrf().simstart()
1240  def make_products(self):
1241  """!creates FileProduct objects for all wrfanl files."""
1242  self._products=dict()
1243  MOAD=self.wrf().get_moad()
1244  for domain in self.wrf():
1245  if domain==MOAD: continue
1246  pname=self.wrf().analysis_name(domain)
1247  loc=os.path.join(self.outdir,pname)
1248  self._products[domain]=FileProduct(self.dstore,pname,
1249  self.taskname,location=loc)
1250  def get_wrfanl(self,domain):
1251  """!returns the wrfanl product for the specified domain
1252  @param domain the domain for whom the wrfanl file is requested"""
1253  if domain is None: return None
1254  if not domain in self.sim: return None
1255  domain=self.sim[domain]
1256  if domain.is_moad(): return None
1257  return self._products[domain]
1258  def wrfanl_at_time(self,atime,domain):
1259  """!get a wrfanl file for a specified domain and analysis time
1260 
1261  Returns the wrfanl product for the specified domain and analysis
1262  time, if one exists, and otherwise, None.
1263  @param domain the domain (an integer, string name or WRFDomain)
1264  @param atime the analysis time """
1265  if atime!=self.anltime: return None
1266  if domain not in self.wrf(): return None
1267  d=self.wrf()[domain]
1268  if d.is_moad() :return None
1269  return self._products[d]
1270  def deliver_products(self):
1271  """!delivers products during a call to run()
1272 
1273  Delivers the products (FileProduct objects) that were
1274  identified by make_products."""
1275  logger=self.log()
1276  logger.info('%s: make directory'%(self.outdir,))
1277  first=True
1278  for domain in self.wrf():
1279  if domain not in self._products: continue # skip MOAD
1280  p=self._products[domain]
1281  if first:
1282  first=False
1283  produtil.fileop.makedirs(os.path.dirname(p.location))
1284  p.deliver(frominfo=os.path.basename(p.location),logger=logger,
1285  keep=False)
1286  def products(self,domain=None):
1287  """!iterates over products
1288  Iterates over all Product objects that were identified by
1289  make_products.
1290  @param domain if provided, only this domain's Products are yielded"""
1291  if domain:
1292  if self._products.has_key(domain):
1293  yield self._products[domain]
1294  else:
1295  for domain in self.wrf():
1296  if self._products.has_key(domain):
1297  yield self._products[domain]
1298 
1299 ########################################################################
1301  """!runs a short WRF simulation to generate wrfanl
1302  files named "ghost"
1303 
1304  This is the same as the superclass, WRFAnl, but it renames wrfanl
1305  files to "ghost" instead of "wrfanl". Also, history output is
1306  disabled."""
1307  def make_wrf(self,wrf):
1308  """!makes the WRFSimulation object for this class.
1309 
1310  Creates a copy of the WRFSimulation object wrf. This first
1311  calls the WRFAnl.make_wrf, and then disables the history
1312  stream.
1313  @param wrf the wrf argument to __init__"""
1314  wrf=super(WRFGhost,self).make_wrf(wrf)
1315  wrf.set_wrfanl_outname('ghost_d<domain>')
1316  wrf.add_output('history',step=3600*3,end=9*3600,start=3*3600)
1317  return wrf
1318  def get_ghost(self,domain):
1319  """!same as get_wrfanl()
1320  @param domain the domain of interest"""
1321  if domain is None: return None
1322  if not domain in self.sim: return None
1323  domain=self.sim[domain]
1324  if domain.is_moad(): return None
1325  return self._products[domain]
1326 
1327 ########################################################################
1328 
1329 ##request only the outermost domain
1330 #
1331 #Special value for the trakdoms argument to WRFAnl4Trak.__init__().
1332 #Requests that only the outermost domain be used in generating the
1333 #parent model track
1334 JUST_MOAD=object()
1335 
1336 ##request all domains
1337 #
1338 #Special value for the trakdoms argument to WRFAnl4Trak.__init__().
1339 #Requests that all domains be used in generating the parent model track
1340 ALL_DOMS=object()
1341 
1342 ########################################################################
1344  """!runs wrfanl and generates a track file
1345 
1346  This subtask of WRFAnl modifies the time of the 1 minute forecast
1347  wrfout file from the outer domain to have the analysis time
1348  instead so that the tracker can be run to get the initial storm
1349  location."""
1350  def __init__(self,dstore,conf,section,wrf,trakdoms=JUST_MOAD,
1351  trakname='trankin_d<domain>',**kwargs):
1352  """!constructor
1353 
1354  Constructor for WRFAnl4Trak.
1355  @param trakdoms what domains should be used for the tracks?
1356  either JUST_MOAD or ALL_DOMS or a list of WRFDomain objects
1357  @param trakname the track output filename.
1358  @param dstore,conf,section,wrf,kwargs passed to superclass constructor"""
1359  self._trakdoms=trakdoms
1360  self._trackin_name_pattern=trakname
1361  super(WRFAnl4Trak,self).__init__(dstore,conf,section,wrf,**kwargs)
1362  def track_domains(self):
1363  """! iterates over track input domains
1364 
1365  Iterates over domains specified by the trakdoms parameter to
1366  __init__()."""
1367  if self._trakdoms is JUST_MOAD:
1368  yield self.sim.get_moad()
1369  elif self._trakdoms is ALL_DOMS:
1370  for domain in self.sim: yield domain
1371  else:
1372  for domain in self._trakdoms: yield self.sim[domain]
1373  def make_products(self):
1374  """!creates Product objects for deliver_products() and products()
1375 
1376  Called from constructor. This make_products adds all products
1377  produced by WRFAnl.make_products, but adds a product for the
1378  outer domain one minute forecast history stream (wrfout) file
1379  called "trackin_d<domain>". That file is intended to be used
1380  to generate the parent domain vortex information."""
1381  WRFAnl.make_products(self)
1382 
1383  sim=self.sim
1384  self.trackin_name=dict()
1385  self.trackin_prod=dict()
1386  trackin_name_pattern=self._trackin_name_pattern
1387  for domain in self.track_domains():
1388  idom=sim[domain].get_grid_id()
1390  trackin_name_pattern,idom,
1391  sim.simstart(),sim.get_nocolons())
1392  self.trackin_name[domain]=name
1393  loc=os.path.join(self.outdir,name)
1394  prod=FileProduct(self.dstore,name,self.taskname,location=loc)
1395  prod.location=loc
1396  prod['stream']='history'
1397  self.trackin_prod[domain]=prod
1398 
1399  def postrun(self):
1400  """!modifies trakin_* files to say they're at time 0
1401 
1402  Calls retime_wrfout for all domains whose trackin_d0X files
1403  are requested. This produces the modified 1 minute forecast
1404  wrfout files that lie and say they're from the analysis time."""
1405  for domain in self.track_domains():
1406  self.retime_wrfout(domain)
1407 
1408  def retime_wrfout(self,domain):
1409  """!modifies a trakin file's internal timestamp
1410 
1411  If possible, modifies the stated output time in the one minute
1412  forecast trackin_d<domain> file to the analysis time. Does
1413  this for one domain. For intio files, it does not modify the
1414  file, but instead simply renames it. That is done because, at
1415  last test, the post does not actually look in an intio wrfout
1416  file for the time, so no modification is necessary.
1417  @param domain the domain whose trakin file is being modified"""
1418  stream='history'
1419  logger=self.log()
1420  sim=self.wrf()
1421  dom=sim[domain]
1422  name=self.trackin_name[domain]
1423  logger.info('simend = '+repr(self.wrf().simend())+' = '+str(self.wrf().simend()))
1424  infile=dom.get_output(stream,self.wrf().simend(),logger=logger)
1425  logger.info('infile = '+str(infile.path()))
1426  io_form=sim.io_form_for(stream)%100
1427  if io_form == 1:
1428  logger.warning('%s: renaming instead of running wrfout_newtime '
1429  'since file is (probably) not NetCDF: '
1430  'io_form=%d'%(infile,io_form))
1431  os.rename(infile.path(),name)
1432  return
1433  try:
1434  shutil.copy2(infile.path(),name)
1435  checkrun(bigexe(self.getexe('wrfout_newtime'))[name,
1436  sim.simstart().strftime('%Y-%m-%d_%H:%M:%S')])
1437  except Exception as e:
1438  logger.error('%s (from %s): unable to modify time in NetCDF '
1439  'file: %s'%(infile.path(), name, str(e)))
1440  raise
1441 
1442  def deliver_products(self):
1443  """!delivers products
1444 
1445  Delivers all products (FileProduct objects) created by
1446  make_products, including the new trackin_d<domain> added by
1447  this subclass, and all products added by the superclass
1448  WRFAnl."""
1449  super(WRFAnl4Trak,self).deliver_products()
1450  for d,p in self.trackin_prod.iteritems():
1451  p.deliver(frominfo=self.trackin_name[d],
1452  logger=self.log(),keep=False)
1453 
1454  def get_trackin(self,domain=None):
1455  """!returns a trakin (trackin) Product
1456 
1457  Returns a trakin (trackin) Product. If a domain is specified,
1458  returns the trackin file for that domain. Otherwise, the
1459  MOAD (outermost domain) is assumed.
1460  @param domain None, or the domain of interest
1461  @return a Product for the trakin file for the specified domain"""
1462  if domain is None:
1463  domain=self.sim.get_moad()
1464  if domain in self.trackin_prod:
1465  return self.trackin_prod[domain]
1466  return None
1467 
1468  def products(self,domain=None,domains=None,time=None,stream=None):
1469  """!iterates over all products from this task.
1470 
1471  Iterates over all products from this task. This class adds
1472  the trackin_d0* files.
1473 
1474  @param domain the domain of interest
1475  @param domains a list of domains of interest
1476  @param time the analysis time
1477  @param stream the output stream"""
1478  if not domains and not time and not stream:
1479  for p in WRFAnl.products(self,domain=domain): yield p
1480 
1481  if stream and stream!='history': return
1482  if time and time!=self.wrf().simstart(): return
1483 
1484  for d,p in self.trackin_prod.iteritems():
1485  simd=self.sim[d]
1486  if domains and simd not in domains: continue
1487  if domain is not None and domain!=simd: continue
1488  yield p
1489  ##@var trackin_name
1490  # mapping from WRFDomain to product name for trakin files
1491  # @protected
1492 
1493  ##@var trackin_prod
1494  # mapping from WRFDomain to product for trakin files
1495  # @protected
1496 
1497 ########################################################################
1499  """!runs wrf to generate ghost (wrfanl) and wrfout files
1500 
1501  This class generates wrfghost files, and wrfout files, for each
1502  domain. The wrfout files happen at the end of a roughly one
1503  minute forecast so that they can be correctly post-processed.
1504  However, their internal timestamp has been changed to be for the
1505  analysis time. This class derives from WRFAnl4Trak instead of
1506  WRFGhost to reuse the wrfout renamer functionality, but it may be
1507  used in place of a WRFGhost object.
1508 
1509  Note that a wrf ghost file is a wrfanl file. There is no
1510  difference whatsoever, except in the nomenclature in the HWRF
1511  initialization."""
1512  def __init__(self,dstore,conf,section,wrf,trakdoms=ALL_DOMS,
1513  trakname='ghout_d<domain>',**kwargs):
1514  """!constructor
1515 
1516  Creates a WRFGhostForPost, passing all arguments to
1517  WRFAnl4Trak.__init__()"""
1518  super(WRFGhostForPost,self).__init__(
1519  dstore,conf,section,wrf,trakdoms,trakname,**kwargs)
1520  def make_wrf(self,wrf):
1521  """!creates a WRFSimulation that calls its wrfanl files "ghost"
1522  files instead."""
1523  wrf=super(WRFGhostForPost,self).make_wrf(wrf)
1524  wrf.set_wrfanl_outname('ghost_d<domain>')
1525  return wrf
1526 
1527  def get_ghout(self,domain):
1528  """!returns the ghost wrfout product for the specified domain."""
1529  assert(domain is not None)
1530  return self.get_trackin(domain)
1531 
1532  def get_ghost(self,domain):
1533  """!same as get_wrfanl()
1534 
1535  This works exactly like get_wrfanl()
1536  @param domain the domain of interest"""
1537  if domain is None: return None
1538  if not domain in self.sim: return None
1539  domain=self.sim[domain]
1540  if domain.is_moad(): return None
1541  return self._products[domain]
1542 
1543 ########################################################################
1545  """!The uncoupled HWRF forecast Task.
1546 
1547  This class runs an atmosphere-only WRF run, using wrfanl files
1548  from a prior WRFAnl simulation. It encapsulates an
1549  ExternalWRFTask, which performs the actual tracking of products.
1550  This allows the post-processors and wrf copy tasks to keep track
1551  of the model's output while the model is running. That subtask
1552  shows up as ".mon" (for "monitor") relative to this task (so if
1553  this task is wrfatmos, then the external task is wrfatmos.mon)."""
1554  def __init__(self,dstore,conf,section,wrf,keeprun=True,
1555  wrfdiag_stream='auxhist1',**kwargs):
1556  """!WRFAtmos constructor
1557 
1558  The constructor for WRFAtmos.
1559  @param dstore the produtil.datastore.Datastore object
1560  @param conf the HWRFConfig object for configuration information
1561  @param section the section name within that HWRFConfig object
1562  @param wrf the hwrf.wrf.WRFSimulation that is to be executed
1563  @param keeprun True means the output directory should NOT be deleted
1564  @param wrfdiag_stream stream that generates wrfdiag files
1565  @param kwargs passed to the parent class constructor."""
1566  self._wrfdiag_stream=wrfdiag_stream
1567  # Create the WRFTaskBase first since it creates a copy of the
1568  # wrf, modifying the namelist in various ways:
1569  super(WRFAtmos,self).__init__(dstore,conf,section,wrf,
1570  keeprun=keeprun,**kwargs)
1571  # Create an ExternalWRFTask object to handle actual product
1572  # generation:
1573  self.__ex=hwrf.wrf.ExternalWRFTask(dstore=self.dstore,conf=self.conf,
1574  section=self.section,wrf=self.wrf(),
1575  taskname=self.taskname+'.mon',
1576  location=self.workdir,outdir=self.workdir)
1577  self.__ex['outdir']=self.workdir # make sure it is set even on rerun
1578  def make_wrf(self,wrf):
1579  """!creates a WRFSimulation for an uncoupled forecast run
1580 
1581  Creates a WRFSimulation for an uncoupled forecast run. Runs
1582  the superclass WRFAtmos.make_wrf. Instructs the resulting
1583  WRFSimulation to take analysis files as input, and calls the
1584  _set_wrf_proc_config to set I/O server and compute grid
1585  dimensions.
1586  @param wrf the wrf argument to __init__"""
1587  wrf=super(WRFAtmos,self).make_wrf(wrf)
1588  wrf.analysis_in()
1589  logger=self.log()
1590  self._set_wrf_proc_config(wrf,logger)
1591  return wrf
1592  def unrun(self):
1593  """!deletes output files
1594 
1595  Deletes output files. See the
1596  hwrf.wrf.ExternalWRFTask.unrun for details."""
1597  self.__ex.unrun()
1598  def run(self):
1599  """!runs the uncoupled WRF forecast
1600 
1601  Performs all work needed to run the program. Sets the state
1602  to produtil.datastore.UNSTARTED. Creates the work directory,
1603  CD's to it, runs the initial_prerun, link_fix,
1604  link_all_inputs, make_namelist, final_prerun, run_exe, postrun
1605  and deliver_products."""
1606  self.state=UNSTARTED
1607  self.__ex.state=UNSTARTED
1608  super(WRFAtmos,self).run()
1609  def products(self,**kwargs):
1610  """!iterates over all WRF products.
1611 
1612  Iterates over all WRF products. See the
1613  hwrf.wrf.ExternalWRFTask.products() for details.
1614  @param kwargs passed to hwrf.wrf.ExternalWRFTask.products()"""
1615  for p in self.__ex.products(**kwargs):
1616  yield p
1617  def exproducts(self,**kwargs):
1618  """!iterates over all WRF products.
1619 
1620  Synonym for products(). Iterates over all WRF products. See the
1621  hwrf.wrf.ExternalWRFTask.products() for details.
1622  @param kwargs passed to hwrf.wrf.ExternalWRFTask.products()"""
1623  for p in self.__ex.products(**kwargs):
1624  yield p
1625  def wrf_check(self,**kwargs):
1626  """!checks the status of the WRF simulation.
1627 
1628  Checks the status of the WRF simulation. Should only be
1629  used while the simulation is running. This is intended to be
1630  run by jobs other than the WRF job, such as the
1631  post-processors, to monitor the WRF simulation as it
1632  progresses.
1633 
1634  @param kwargs passed to hwrf.wrf.ExternalWRFTask.wrf_check()"""
1635  self.__ex.wrf_check(**kwargs)
1636  def run_exe(self,exename='wrf',not_allranks=False,runner=None,
1637  sleeptime=None):
1638  """!runs the wrf program
1639 
1640  Called from run(), this runs the wrf program after the
1641  directory is set up.
1642 
1643  @param exename sent to getexe() to get the path to wrf if
1644  runner is unspecified
1645  @param not_allranks if True, only use one MPI rank. Do not use.
1646  @param runner an alternative produtil.prog.Runner to execute.
1647  @param sleeptime seconds to sleep between checks of the wrf
1648  executables completion. The default sleeptime, if none is
1649  specified is 60 seconds rather than the usual 30."""
1650  if sleeptime is None:
1651  sleeptime=self.conffloat('sleeptime',60)
1652  super(WRFAtmos,self).run_exe(
1653  exename=exename,not_allranks=not_allranks,
1654  runner=runner,sleeptime=sleeptime)
1655  def update_state(self):
1656  """!checks the wrf state and updates it in the HWRF database file
1657 
1658  Checks the state of the WRF simulation and copies that
1659  information to self.state in the produtil.datastore.Datastore.
1660  See hwrf.wrf.ExternalWRFTask.update_state() for details."""
1661  with self.dstore.transaction() as t:
1662  self.__ex.update_state()
1663  self.state=self.__ex.state
1664  def deliver_products(self,*args,**kwargs):
1665  """!does nothing
1666 
1667  Has no effect. This is present only because it is a
1668  requirement of the parent class. No delivery is required
1669  because the products are all UpstreamFile objects, so the
1670  delivery state is set by the post-processors when calling the
1671  "check" function of each product.
1672  @param args,kwargs ignored"""
1673 
1674 ########################################################################
1675 class AnalysisCycle(WRFGhost):
1676  """!runs wrf for an analysis cycle
1677 
1678  This class is similar to a WRFGhost run, except that it runs a
1679  longer simulation (typically 1-6 hours), and provides wrfghost and
1680  wrfinput files at the end of it. It also requires wrfghost and
1681  wrfinput files as input. Hence, one can run another AnalysisCycle
1682  after this one, using the output of the previous. Note that this
1683  implementation relies on the fact that wrfghost, wrfanl and
1684  restart files are all exactly the same file format (just different
1685  times and domains)."""
1686  def __init__(self,dstore,conf,section,wrf,simlen=None,
1687  keeprun=False,**kwargs):
1688  """!constructor for AnalysisCycle
1689 
1690  Constructor for the AnalysisCycle class.
1691  @param dstore the produtil.datastore.Datastore to use
1692  @param conf the hwrf.config.HWRFConfig that provides configuration ifnormation
1693  @param section the config section in conf
1694  @param wrf the hwrf.wrf.WRFSimulation object that is being run
1695  @param keeprun if True, the simulation temporary files are not deleted
1696  @param simlen simulation length in seconds
1697  @param kwargs passed unmodified to FcstTask.__init__()"""
1698  self.__prodcache=dict()
1699  if simlen is not None:
1700  simlen=to_timedelta(simlen)
1701  self.__simlen=simlen
1702  super(AnalysisCycle,self).__init__(dstore,conf,section,wrf,
1703  keeprun,**kwargs)
1704  ## @var __simlen
1705  # The simulation length as a datetime.timedelta
1706  # @private
1707  def make_products(self):
1708  """!called from constructor, creates Products
1709 
1710  This is called from the WRFTaskBase constructor. It creates
1711  products that will be used by deliver_products() and
1712  products()"""
1713  self._products=dict()
1714  MOAD=self.wrf().get_moad()
1715  logger=self.log()
1716 
1717  for domain in self.wrf():
1718  pname='wrfinput_d%02d'%(domain.get_grid_id(),)
1719  loc=os.path.join(self.outdir,pname)
1720  if domain==MOAD:
1722  self.dstore,pname,self.taskname,location=loc)
1723  else:
1724  self._products[domain]=FileProduct(
1725  self.dstore,pname,self.taskname,location=loc)
1726 
1727  ## @var prod_wrfinput
1728  # the wrfinput output file at the end of the simulation
1729  # @protected
1730 
1731  ## @var _products
1732  # a mapping from WRFDomain to FileProduct for products other than wrfinput
1733  # @protected
1734 
1735  @property
1736  def anlouttime(self):
1737  """!analysis cycle end time
1738 
1739  The time at end of the analysis cycle, at which the output
1740  wrfanl and wrfinput files are available."""
1741  return self.wrf().simend()
1742 
1743  @property
1744  def anlintime(self):
1745  """!analysis cycle start time
1746 
1747  The time at beginning of the analysis cycle, at which the
1748  input wrfanl and wrfinput files must be provided."""
1749  return self.wrf().simstart()
1750 
1751  @property
1752  def simlen(self):
1753  """!simulation length
1754 
1755  The requested length of the simulation. Note that this may
1756  not be the same as the actual simulation length
1757  (self.anlouttime-self.anlintime) due to the model timestep."""
1758  if self.__simlen is None:
1759  self.__simlen=to_timedelta(self.confstr('simlen'))
1760  return self.__simlen
1761 
1762  def make_wrf(self,wrf):
1763  """!creates the WRFSimulation object
1764 
1765  Called from the constructor. Generates and returns a new
1766  WRFSimulation object that meets these requirements:
1767  1. Read in wrfanl and wrfinput files
1768  2. Output a restart and wrfinput file after self.simlen time
1769  3. Disable history and auxhist 1-3.
1770  @param wrf the wrf argument to __init__"""
1771 
1772  wrf=wrf.copy()
1773 
1774  # Read in analysis files:
1775  wrf.analysis_in()
1776 
1777  # Get the simulation length that was requested in either the
1778  # constructor or conf section:
1779  simlen=to_timedelta(self.simlen)
1780 
1781  # Change the simulation end time to the specified time:
1782  wrf.set_timing(end=simlen)
1783 
1784  # Output a restart file at the end of the simulation:
1785  wrf.add_output('restart',step=simlen,start=simlen,end=simlen)
1786 
1787  # Output an input file at the end of the simulation. The rest
1788  # of this code assumes the file is written only once, with
1789  # only one output frame.
1790  wrf.add_output('inputout',step=simlen,start=simlen,end=simlen,
1791  outname='wrfinputout_d<domain>')
1792  input_outname=wrf.nl.nl_get('time_control','input_outname')
1793  assert(input_outname=='wrfinputout_d<domain>')
1794 
1795  for domain in wrf:
1796  assert(domain.has_output('inputout'))
1797  assert(domain.has_output('restart'))
1798 
1799  # Make the domain not move by setting ntrack to a huge value
1800  for domain in wrf:
1801  domain.nl.nl_set('physics','ntrack',1000)
1802 
1803  # Disable output for four streams by moving the stream output
1804  # start time to after the end of the simulation. This is to
1805  # work around buggy code that wants the WRF history stream
1806  # frequency to decide how to accumulate certain variables.
1807  logger=self.log()
1808  if not wrf.has_output('history'):
1809  wrf.add_output('history',start=simlen*2,step=simlen,end=simlen*3)
1810  for stream in [ 'auxhist1', 'auxhist2', 'auxhist3', 'history' ]:
1811  io_form=wrf.io_form_for(stream)
1812  for domain in wrf:
1813  if domain.has_output(stream):
1814  domain.hide_output(stream)
1815 
1816  return wrf
1817  def wrfinput_at_time(self,atime=None,domain=None):
1818  """!the wrfinput_d01 output Product for the specified time and domain
1819 
1820  @return the wrfinput output Product for the specified time and
1821  domain or None if that time and domain are not valid.
1822 
1823  @param atime the time
1824  @param domain the domain (integer, string name or WRFDomain)"""
1825  if domain and domain!=self.wrf().get_moad(): return None
1826  if atime is not None and \
1827  within_dt_epsilon(atime,self.anltime,self.dt_epsilon):
1828  return self.prod_wrfinput
1829  return None
1830  def get_wrfinput(self):
1831  """!the wrfinput_d01 output product
1832 
1833  @returns the output wrfinput Product"""
1834  return self.prod_wrfinput
1835  def deliver_products(self):
1836  """!delivers products from temporary areas
1837 
1838  Copies output products from temporary areas to long-term
1839  storage locations."""
1840  MOAD=self.wrf().get_moad()
1841  logger=self.log()
1842  when=self.anlouttime
1843 
1844  logger.info('Deliver products: rst and wrfinput at time %s'%(repr(when),))
1845  produtil.fileop.makedirs(self.outdir,logger=logger)
1846 
1847  for domain in self.wrf():
1848  pname='wrfinput_d%02d'%(domain.get_grid_id(),)
1849  loc=os.path.join(self.outdir,pname)
1850  if domain==MOAD:
1851  fromloc=domain.get_output('inputout',when,logger=logger).path()
1852  logger.info('Deliver moad %s from %s'%(str(domain),fromloc))
1853  self.prod_wrfinput.deliver(
1854  frominfo=fromloc,logger=logger,keep=False)
1855  else:
1856  fromloc=domain.get_output('restart',when,logger=logger).path()
1857  logger.info('Deliver nest %s from %s'%(str(domain),fromloc))
1858  self._products[domain].deliver(
1859  frominfo=fromloc,logger=logger,keep=False)
1860  for out in domain.get_outputs('history'):
1861  fromloc=out.path()
1862  logger.info('Deliver nest %s from %s'%(str(domain),fromloc))
1863  toloc=os.path.join(self.outdir,fromloc)
1864  logger.info('Deliver nest %s to %s'%(str(domain),toloc))
1866  fromloc,toloc,keep=True,logger=logger)
1867  def as_product(self,wrfout,relocate=False):
1868  """Converts a WRFOutput to a Product.
1869 
1870  Returns a Product for a WRFOutput. The Product is cached in
1871  self.__prodcache and as_product() will return that cached
1872  value if one is available. Otherwise, a new one is created.
1873  Call clear_cached_products() to clear the cache.."""
1874  if self.__prodcache.has_key(wrfout):
1875  return self.__prodcache[wrfout]
1876  rel=wrfout.path()
1877  #outdir=os.path.join(self['outdir'],rel)
1878  outdir=self['outdir']
1879  assert(outdir is not None)
1880  loc=os.path.join(outdir,os.path.basename(wrfout.path()))
1881  with self.dstore.transaction() as t:
1882  uf=UpstreamFile(self.dstore,category=self.taskname,
1883  prodname=rel,location=loc)
1884  uf['stream']=wrfout.stream()
1885  uf['location']=loc
1886  if relocate: uf.location=loc
1887  self.__prodcache[wrfout]=uf
1888  return uf
1889  def products(self,domain=None,domains=None,stream=None,time=None,relocate=False):
1890  """!iterates over all selected products
1891 
1892  Iterates over all products for the specified domain, or all
1893  products if the domain is unspecified or None.
1894  @param domain the requested domain (an integer, string name or WRFDomain)
1895  @param domains a list of domains of interest
1896  @param time the analysis time
1897  @param stream the output stream"""
1898  if domains is not None:
1899  for d in domains:
1900  if stream is None:
1901  for out in d.get_all_outputs(time):
1902  yield self.as_product(out,relocate=relocate)
1903  elif time is None:
1904  for out in d.get_outputs(stream):
1905  yield self.as_product(out,relocate=relocate)
1906  else:
1907  yield self.as_product(d.get_output(stream,time),
1908  relocate=relocate)
1909  else:
1910  if domain is None:
1911  yield self.prod_wrfinput
1912  elif domain==self.wrf().get_moad():
1913  yield self.prod_wrfinput
1914  return
1915  elif domain is None:
1916  yield self.prod_wrfinput
1917  for p in super(AnalysisCycle,self).products(domain):
1918  yield p
def initial_prerun(self)
deletes the wrfinput_d01 and wrfbdy_d01.
Definition: fcsttask.py:1048
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def get_inputs(self, logger, atime, domain, wrfanl='wrfanl', just_check=False, kwargs)
Links wrfanl files for the specified domain and time.
Definition: fcsttask.py:254
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
Definition: fileop.py:359
Create namelists, monitor wrf simulations, generate filenames.
Definition: wrf.py:1
Links real_nmm fort.65 files the current directory for a wrf run.
Definition: fcsttask.py:209
def wrfanl_at_time(self, atime, domain)
get a wrfanl file for a specified domain and analysis time
Definition: fcsttask.py:1258
def run(self)
runs the uncoupled WRF forecast
Definition: fcsttask.py:1598
def products(self)
iterates over all products Iterates over all of this Task's products (produtil.datastore.FileProduct objects) created by make_products()
Definition: fcsttask.py:1151
def simlen(self)
simulation length
Definition: fcsttask.py:1752
prod_wrfinput
the wrfinput output file at the end of the simulation
Definition: fcsttask.py:1721
def has_input(self, typename)
is there any input data of this type to this task?
Definition: fcsttask.py:382
abstract base class of anything that runs or prepares input for a forecast model
Definition: fcsttask.py:363
def link_input(self, typenames, just_check=False, kwargs)
link or check for inputs
Definition: fcsttask.py:417
def make_wrf(self, wrf)
creates a WRFSimulation object for this class
Definition: fcsttask.py:670
def get_inputs(self, just_check=False, kwargs)
copies or links input files.
Definition: fcsttask.py:73
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
def get_inputs(self, logger, atime, domain, just_check=False, kwargs)
Links coupler input data for the specified domain and time.
Definition: fcsttask.py:218
def deliver_products(self)
delivers products during a call to run()
Definition: fcsttask.py:1270
def wrfinput_at_time
the wrfinput_d01 output Product for the specified time and domain
Definition: fcsttask.py:1817
def deliver_products(self)
delivers products to their destinations
Definition: fcsttask.py:959
runs wrf to generate ghost (wrfanl) and wrfout files
Definition: fcsttask.py:1498
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
def get_inputs(self, logger, atime, domain, just_check=False, kwargs)
Links a wrfbdy file for the specified domain and time.
Definition: fcsttask.py:296
def add_wrfinput(self, r)
adds a wrfinput_d01 input source
Definition: fcsttask.py:719
A subclass of Product that represents file delivery.
Definition: datastore.py:856
def need_all_metgrid(self)
Returns True if all metgrid files are needed as input to this Task.
Definition: fcsttask.py:856
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
def get_wrfanl(self, domain)
returns the wrfanl product for the specified domain
Definition: fcsttask.py:1250
conf
This HWRFTask's hwrf.config.HWRFConfig object.
Definition: hwrftask.py:415
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
def anlintime(self)
analysis cycle start time
Definition: fcsttask.py:1744
a HWRFTask subclass that runs real_nmm
Definition: fcsttask.py:1001
def get_trackin
returns a trakin (trackin) Product
Definition: fcsttask.py:1454
storminfo
The hwrf.storminfo.StormInfo describing the vitals information for the storm processed by this HWRFTa...
Definition: hwrftask.py:94
def add_prep_hybrid(self, p)
adds a prep_hybrid data source
Definition: fcsttask.py:1020
base class of classes that run wrf or real_nmm
Definition: fcsttask.py:488
def retime_wrfout(self, domain)
modifies a trakin file's internal timestamp
Definition: fcsttask.py:1408
def check_input(self, typenames, kwargs)
check if input data is available
Definition: fcsttask.py:403
Links Geogrid data to the current directory for a wrf or real_nmm run.
Definition: fcsttask.py:134
def _set_wrf_proc_config
sets nproc_x, nproc_y and I/O server settings
Definition: fcsttask.py:514
runs a short WRF simulation to generate wrfanl files
Definition: fcsttask.py:1207
def get_ghout(self, domain)
returns the ghost wrfout product for the specified domain.
Definition: fcsttask.py:1527
runs wrfanl and generates a track file
Definition: fcsttask.py:1343
__simlen
The simulation length as a datetime.timedelta.
Definition: fcsttask.py:1701
def wrfinput_at_time
returns the wrfinput file for a specified time and domain
Definition: fcsttask.py:1104
domain
the domain for which this object provides wrfanl files, or None if it provides wrfanl files for all d...
Definition: fcsttask.py:252
low-level wrf implementation, underlying hwrf.wrf
Definition: wrfbase.py:1
trackin_name
mapping from WRFDomain to product name for trakin files
Definition: fcsttask.py:1384
trackin_prod
mapping from WRFDomain to product for trakin files
Definition: fcsttask.py:1385
_wrfdiag_stream
WRFAtmos constructor.
Definition: fcsttask.py:1566
def run_exe
runs the wrf program
Definition: fcsttask.py:1637
def __init__(self, dstore, conf, section, wrf, trakdoms=JUST_MOAD, trakname='trankin_d< domain >', kwargs)
constructor
Definition: fcsttask.py:1351
def final_prerun(self)
last step before running executable
Definition: fcsttask.py:908
def check_all_inputs(self)
Checks to see if all needed input is available.
Definition: fcsttask.py:813
def confbool
Alias for self.conf.getbool for section self.section.
Definition: hwrftask.py:287
def anlouttime(self)
analysis cycle end time
Definition: fcsttask.py:1736
section
The confsection in self.section for this HWRFTask (read-only)
Definition: hwrftask.py:422
Links prep_hybrid files to the current directory for a real_nmm run.
Definition: fcsttask.py:317
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
prod_log
the rsl.out.0000 output file product
Definition: fcsttask.py:1016
prod_wrfbdy
the wrfbdy output product
Definition: fcsttask.py:1014
def wrf(self)
Returns the WRFSimulation object used by this task.
Definition: fcsttask.py:982
def __init__(self, dstore, conf, section, wrf, keeprun=True, wrfdiag_stream='auxhist1', kwargs)
WRFAtmos constructor.
Definition: fcsttask.py:1555
def products
iterates over all products from this task.
Definition: fcsttask.py:1468
A piece of data produced by a Task.
Definition: datastore.py:716
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
def update_state(self)
checks the wrf state and updates it in the HWRF database file
Definition: fcsttask.py:1655
def sim(self)
Returns the WRFSimulation object used by this task.
Definition: fcsttask.py:986
def deliver_products(self)
delivers products that were identified by make_products()
Definition: fcsttask.py:1143
def add_geogrid(self, g)
adds a geogrid input source
Definition: fcsttask.py:680
def make_products(self)
called from constructor, creates Products
Definition: fcsttask.py:1707
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
The uncoupled HWRF forecast Task.
Definition: fcsttask.py:1544
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
def makedirs
Make a directory tree, working around filesystem bugs.
Definition: fileop.py:224
def get_inputs(self, logger, domain, just_check=False, kwargs)
Links geogrid data if any is available for the specified domain.
Definition: fcsttask.py:143
def add_real(self, r)
add a fort.65, wrfinput_d01 and wrfbdy_d01 input source
Definition: fcsttask.py:754
Time manipulation and other numerical routines.
Definition: numerics.py:1
def make_wrf(self, wrf)
creates the WRFSimulation object
Definition: fcsttask.py:1762
def link_all_inputs
Links all inputs provided by the various add_* member functions.
Definition: fcsttask.py:816
def postrun(self)
modifies trakin_* files to say they're at time 0
Definition: fcsttask.py:1399
def get_inputs(self, logger, atime, domain, just_check=False, kwargs)
Links wrfinput_d01 files.
Definition: fcsttask.py:193
def add_prep_hybrid(self, p)
adds a prep_hybrid input source
Definition: fcsttask.py:702
def products(self, kwargs)
iterates over all WRF products.
Definition: fcsttask.py:1609
def add_fort65(self, r)
adds a coupler fort.65 input source
Definition: fcsttask.py:743
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def confint
Alias for self.conf.getint for section self.section.
Definition: hwrftask.py:248
def run(self)
run the wrf or real_nmm
Definition: fcsttask.py:920
def __init__(self, dstore, conf, section, wrf, trakdoms=ALL_DOMS, trakname='ghout_d< domain >', kwargs)
constructor
Definition: fcsttask.py:1513
def fort65_at_time
returns the coupler fort.65 file for a specified time and domain
Definition: fcsttask.py:1130
def conffloat
Alias for self.conf.getfloat for section self.section.
Definition: hwrftask.py:274
def __init__(self, dstore, conf, section, wrf, keeprun=True, kwargs)
constructor
Definition: fcsttask.py:493
def wrfbdy_at_time
returns the wrfbdy file for a specified time and domain
Definition: fcsttask.py:1117
def log
Obtain a logging domain.
Definition: hwrftask.py:425
monitors a running wrf simulation
Definition: wrf.py:1296
def link_fix(self)
links or copies fixed data files
Definition: fcsttask.py:789
def make_wrf(self, wrf)
creates a WRFSimulation that calls its wrfanl files "ghost" files instead.
Definition: fcsttask.py:1520
def get_wrfinput(self)
the wrfinput_d01 output product
Definition: fcsttask.py:1830
def make_wrf(self, wrf)
creates a WRFSimulation for an uncoupled forecast run
Definition: fcsttask.py:1578
def get_ghost(self, domain)
same as get_wrfanl()
Definition: fcsttask.py:1318
def anltime(self)
the time for which analysis files are generated.
Definition: fcsttask.py:1237
inputs
a mapping of typename to a list of input objects
Definition: fcsttask.py:381
def products
iterates over products Iterates over all Product objects that were identified by make_products.
Definition: fcsttask.py:1286
def unrun(self)
deletes output files
Definition: fcsttask.py:1592
def track_domains(self)
iterates over track input domains
Definition: fcsttask.py:1362
def run_exe
runs the executable
Definition: fcsttask.py:861
def deliver_products(self)
delivers products from temporary areas
Definition: fcsttask.py:1835
def use_prep_hybrid(self)
returns True if prep_hybrid was requested, and False otherwise.
Definition: fcsttask.py:715
Links Metgrid data to the current directory for a wrf or real_nmm run.
Definition: fcsttask.py:158
def link_product
helper function that links data
Definition: fcsttask.py:86
def __init__(self, src)
creates a new Input2Fcst with the specified src.
Definition: fcsttask.py:66
def parse_wrf_outname(outname, grid_id, date, nocolons)
generates wrf filenames from templates like construct_filename
Definition: wrfbase.py:21
def initial_prerun(self)
called immediately after cding to a scrub directory
Definition: fcsttask.py:914
def add_input(self, typename, inobj)
add input of a specified type
Definition: fcsttask.py:393
def __init__(self, dstore, conf, section, outdir=None, taskname=None, kwargs)
Creates a FcstTask object.
Definition: fcsttask.py:372
def make_namelist
generates the wrf namelist.input file
Definition: fcsttask.py:968
def make_wrf(self, wrf)
creates the WRFSimulation object (self.wrf())
Definition: fcsttask.py:1209
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
src
the implementation-defined source, used by subclasses
Definition: fcsttask.py:72
prod_wrfinput
the wrfinput output product
Definition: fcsttask.py:1012
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
def make_namelist
Writes the namelist for real.
Definition: fcsttask.py:1160
def __init__(self, src, domain)
creates a WRFAnl2WRF for the specified source and domain
Definition: fcsttask.py:249
def get_wrfinput(self)
returns the wrfinput file regardless of the time or domain
Definition: fcsttask.py:1100
runs a short WRF simulation to generate wrfanl files named "ghost"
Definition: fcsttask.py:1300
def run_exe
runs the real_nmm program
Definition: fcsttask.py:1057
def __init__(self, dstore, conf, section, wrf, simlen=None, keeprun=False, kwargs)
runs wrf for an analysis cycle
Definition: fcsttask.py:1687
def postrun(self)
called just after run_exe() succeeds.
Definition: fcsttask.py:953
def deliver_products(self, args, kwargs)
does nothing
Definition: fcsttask.py:1664
def add_wrfanl(self, r, domain)
add a wrfanl input source for a specific domain
Definition: fcsttask.py:767
def wrf_check(self, kwargs)
checks the status of the WRF simulation.
Definition: fcsttask.py:1625
Links real_nmm wrfinput_d01 files the current directory for a wrf run.
Definition: fcsttask.py:183
def make_wrf(self, wrf)
makes the WRFSimulation object for this class.
Definition: fcsttask.py:1307
keeprun
if False, temporary files are deleted, otherwise they're kept
Definition: fcsttask.py:513
def products
iterates over all selected products
Definition: fcsttask.py:1889
state
runs the uncoupled WRF forecast
Definition: fcsttask.py:1606
def get_inputs(self, logger, times, just_check=False, kwargs)
Links prep_hybrid output files for the specified time index.
Definition: fcsttask.py:327
prod_fort65
the output product for the coupler fort.65 input file
Definition: fcsttask.py:1018
def add_wrfbdy(self, r)
adds a wrfbdy_d01 input source
Definition: fcsttask.py:731
def add_metgrid(self, m)
adds a metgrid input source
Definition: fcsttask.py:691
def exproducts(self, kwargs)
iterates over all WRF products.
Definition: fcsttask.py:1617
Links wrfanl or ghost files the current directory for a wrf run.
Definition: fcsttask.py:241
abstract base class of wrf/real_nmm input providers
Definition: fcsttask.py:59
def get_ghost(self, domain)
same as get_wrfanl()
Definition: fcsttask.py:1532
def make_wrf(self, wrf)
creates a WRFSimulation object for this class
Definition: fcsttask.py:1033
def make_products(self)
creates FileProduct objects for all wrfanl files.
Definition: fcsttask.py:1240
Represents a Product created by an external workflow.
Definition: datastore.py:915
Links real_nmm wrfbdy_d01 files the current directory for a wrf run.
Definition: fcsttask.py:287
def deliver_products(self)
delivers products
Definition: fcsttask.py:1442
dt_epsilon
epsilon value for time equality comparisons
Definition: fcsttask.py:512
def get_inputs(self, logger, ftime, just_check=False, kwargs)
Links metgrid data if any is available for the specified time.
Definition: fcsttask.py:168
def make_products(self)
called from constructor, creates Products
Definition: fcsttask.py:781
def need_all_metgrid(self)
returns True if all metgrid files are needed as input to this Task
Definition: fcsttask.py:1044
def make_products(self)
prepares products for deliver_products() and products()
Definition: fcsttask.py:1006
def make_products(self)
creates Product objects for deliver_products() and products()
Definition: fcsttask.py:1373