HWRF  trunk@4391
coupling.py
1 import os, shutil, math
3 import produtil.rusage
5 
6 from produtil.rusage import setrlimit, rusage, getrlimit
7 from produtil.fileop import isnonempty, make_symlink, deliver_file
8 from produtil.run import mpirun, mpi
9 from hwrf.numerics import to_fraction,to_datetime_rel,TimeMapping
10 
11 from produtil.log import jlogger
12 
13 ########################################################################
14 class Component(object):
15  """This is a simple utility class that stores information about
16  coupler components (wrf, pom, hycom, wavewatch3, coupler, etc.)
17  for the CoupledWRF class. You should never instantiate a
18  Component -- only CoupledWRF should do that. It will return one
19  of these when you run CoupledWRF's "component" function.
20 
21  You can safely modify the exe, rankname, rankdefault and initer
22  elements of this class, and it will modify the corresponding
23  information in the CoupledWRF. Do not modify the name and order.
24  Modifying the name or order will break the internal data
25  structures in CoupledWRF"""
26  def __init__(self,name,exe,rankname,rankdefault,initer):
27  """Creates a Component with the given characteristics.
28  Initializes the order to None. The order must be overwritten
29  externally, and is done in CoupledWRF.couple."""
30  (self._name,self.exe,self.rankname,self.rankdefault,self.initer)=\
31  (name,exe,rankname,rankdefault,initer)
32  self._order=None
33  @property
34  def name(self): return self._name
35  @property
36  def order(self): return self._order
37 
38 ########################################################################
39 class ComponentIniter(object):
40  """This class should be subclassed to make coupling component
41  initializers for CoupledWRF. That is, you should implement a
42  subclass and send it to the CoupledWRF.couple(...,initer=). Its
43  job is to provide input data and initialization checks for the
44  various coupling components such as ocean and wave. Note that the
45  CoupledWRF itself initializes the WRF and coupler, so you do not
46  need a ComponentIniter for those components."""
47  def check_coupled_inputs(self,logger):
48  """This function implements a very fast (<1 second) check to
49  see if the components initialization was complete. Returns
50  True on success or False on failure."""
51  raise NotImplementedError(
52  "Subclass forgot to implement check_coupled_inputs.")
53  def link_coupled_inputs(self,just_check,logger):
54  """If just_check is True, then this function implements a more
55  expensive check of whether the initialization for this
56  component succeeded. If just_check is False, then this
57  component should link or copy all files needed to run a
58  coupled forecast. The default implementation calls
59  check_coupled_inputs if just_check is True, and simply returns
60  True if just_check is False. Subclasses should re-implement
61  this function."""
62  if just_check:
63  return check_coupled_inputs(logger)
64  return True
65  def make_exe(self,task,exe,ranks):
66  """Called when it is time to actually run the coupled forecast
67  executables. Returns a produtil.mpiprog.MPIRanksBase for the
68  MPI ranks for this component of the coupling. By default,
69  this returns mpi(task.getexe(exe))*ranks. However, the caller
70  can re-implement this function to replace the executable
71  depending on the results of the initialization. If that is
72  desired, the task.getexe (and other task.get*) should be used
73  so the executable locations can be overridden in the config
74  files."""
75  return mpi(task.getexe(exe))*ranks
76 
77 ########################################################################
79  """Runs the NCEP coupler and WRF, coupled to at least one ocean or
80  wave model. This class is not thread-safe due to how self.couple
81  is implemented."""
82  def __init__(self,dstore,conf,section,wrf,keeprun=True,
83  wrfdiag_stream='auxhist1',**kwargs):
84  super(CoupledWRF,self).__init__(dstore,conf,section,wrf,keeprun,
85  wrfdiag_stream,**kwargs)
86  self.__components=dict()
87  self.__order=list()
88  #self.couple('coupler','hwrf_wm3c','wm3c_ranks',1,None)
89  #self.couple('pom','hwrf_ocean_fcst','pom_ranks',None,None)
90  #self.couple('wrf','hwrf_wrf','wrf_ranks',None,None)
91  self._coupled_products=dict()
92  self.__coupled=None
93  self._default_coupling=True
94 
95  def check_all_inputs(self,coupled=None):
96  """Returns True if all inputs needed to run the forecast are
97  present, and false otherwise. If coupled=True (the default),
98  then coupled components inputs are also checked, otherwise
99  only WRF inputs are checked."""
100  if coupled is not None: self.__coupled=self._default_coupling
101  logger=self.log()
102  okay=super(CoupledWRF,self).check_all_inputs()
103  if okay and coupled:
104  for c in self.coupleiter():
105  if c.initer is not None:
106  okay=c.initer.check_coupled_inputs(logger=self.log())
107  return okay
108 
109  def link_all_inputs(self,just_check=False,coupled=None):
110  """If just_check=True, links or copies inputs required by all
111  components of the coupled simulation. If just_check=False,
112  runs through all steps required to link or copy inputs without
113  actually linking or copying. That mode is intended to be an
114  expensive but more thurough check than check_all_inputs.
115 
116  Returns True if all inputs were linked, and False otherwise."""
117  logger=self.log()
118  okay=super(CoupledWRF,self).link_all_inputs(
119  just_check=just_check)
120  if just_check:
121  if okay:
122  logger.info('WRF inputs are present.')
123  else:
124  logger.error('FAIL: WRF inputs are missing.')
125  if coupled is None: coupled=self.__coupled
126  if coupled is None: coupled=self._default_coupling
127  if not coupled:
128  logger.info(
129  'Not copying ocean, wave and coupler inputs because an '
130  'uncoupled simulation was requested.')
131  return okay
132  if not just_check:
133  self.make_coupler_namelist('cpl_nml')
134  for c in self.coupleiter():
135  if c.initer is not None:
136  okay=c.initer.link_coupled_inputs(just_check,
137  logger=logger)
138  if not okay:
139  msg='%s inputs are missing.'%(c.name,)
140  logger.error(msg)
141  if not just_check:
143  return False
144  else:
145  logger.info('%s inputs are all present.'%(c.name,))
146  return okay
147 
148  def run(self,coupled=None):
149  """Runs the coupled simulation. Sets the internal coupled
150  vs. uncoupled flag to the specified value. Default: True
151  (coupled)."""
152  if coupled is None: coupled=self._default_coupling
153  self.__coupled=coupled
154  super(CoupledWRF,self).run()
155 
156  def run_exe(self):
157  """Runs the MPI command for the coupled coupled simulation.
158  Do not call this directly: call self.run instead."""
159  logger=self.log()
160  setrlimit(logger=logger,stack=6e9,ignore=True)
161  if self.__coupled is None: self.__coupled=self._default_coupling
162  if self.__coupled:
163  logger.info('Starting the coupled wrf simulation')
164  cmd=None
165  for c in self.coupleiter():
166  logger.info(
167  'Component #%d is %s: exe=%s ranks from %s (default %s)'
168  %(c.order,c.name,c.exe,c.rankname,repr(c.rankdefault)))
169  exe=c.exe
170  ranks=self.confint(c.rankname,c.rankdefault)
171  if not isinstance(ranks,int):
172  raise TypeError(
173  'Somehow ended up with a non-int for ranks in '
174  'CoupledWRF.run_exe. Check inputs. Ranks is a %s '
175  '%s.'%(type(ranks).__name__,repr(ranks)))
176  if not ranks>0:
177  raise ValueError(
178  'Ranks (%d) is not >0 in CoupledWRF.run_exe. Check '
179  'config files and scripts.'%ranks)
180  if c.initer is not None:
181  mpiified=c.initer.make_exe(self,exe,ranks)
182  else:
183  mpiified=mpi(self.getexe(exe))*ranks
184  cmd = mpiified if(cmd is None) else cmd+mpiified
185  if cmd is None:
187  'No coupled components specified in CoupledWRF.'
188  ' You must call CoupledWRF.couple(...)')
189  #logfile=self.confstrinterp('{WORKhwrf}/cpl.out')
190  logfile=self.confstrinterp('{coupled_log}')
191  jlogger.info('%s: will log coupled forecast stdout here'%(
192  logfile,))
193  cmd=mpirun(cmd) > logfile
194  #assert(isnonempty('pom.nml'))
195  if not isnonempty('cpl_nml'):
197  'Logic error: somehow the cpl_nml is empty or missing.')
198  super(CoupledWRF,self).run_exe('wrf',None,cmd)
199  else:
200  logger.info('Starting the uncoupled wrf simulation')
201  super(CoupledWRF,self).run_exe('wrf',False,None)
202 
203  def make_coupler_namelist(self,filename='cpl_nml'):
204  """Makes the namelist for the NCEP Coupler"""
205  logger=self.log()
206  logger.info('section is %s'%(self.section,))
207 
208  cplsec=self.confstr('cpl_nml')
209  dt_c=self.confint('dt_c')
210  f_dt_c=to_fraction(dt_c)
211  simlen=to_fraction(self.sim.simend()-self.sim.simstart())
212  cstepmax=int(math.ceil(float(simlen/f_dt_c)))
213  morevars=dict(cstepmax=cstepmax, dt_c=dt_c)
214 
215  logger.info('dt_c=%s f_dt_c=%s simlen=%s cstepmax=%s'%(
216  repr(dt_c),repr(f_dt_c),repr(simlen),repr(cstepmax)))
217 
219  conf=self.conf,section=cplsec,morevars=morevars)
220  with open(filename,'wt') as cn:
221  nmlstr=cplnml.make_namelist(morevars=morevars)
222  cn.write(nmlstr)
223 
224  ####################################################################
225  # Component manipulation
226  def coupleiter(self):
227  """Iterates over all Component objects that describe coupling
228  components."""
229  i=-1
230  for name in self.__order:
231  i+=1
232  c=self.__components[name]
233  yield c
234  def couple(self,name,exe,rankname,rankdefault=None,initer=None):
235  """Adds the specified coupling component. Returns self.
236  name -- a user-defined name of this component, must be a string.
237  exe -- option in the [exe] section with the executable path
238  rankname -- name of the option in this task's section that
239  has the number of ranks
240  rankdefault -- number of ranks if the rankname option is
241  missing or empty. Can be None, in which case the
242  rankname option must be set.
243  initer -- the object that will link inputs ot the working
244  directory before the forecast begins. Can be None.
245  Note that the superclass, WRFAtmos, initializes the WRF
246  component and this class initializes the coupler, so you can
247  pass None for those components' "initer" objects.
248 
249  This subroutine is not thread-safe."""
250  if not isinstance(name,basestring):
251  raise TypeError(
252  'The "name" argument to CoupledWRF.couple must be a string. '
253  'You passed a %s %s.'%(type(name).__name__,repr(name)))
254  if name in ('output','history','restart','restartin','input',
255  'inputout') or name.find('auxhist')==0 or \
256  name.find('auxinput')==0:
257  raise ValueError(
258  'Component name cannot be the same as any WRF stream name: '
259  'output, history, restart, restartin, input, inputout, '
260  'auxhist* or auxinput*.')
261  if rankdefault is not None and not isinstance(rankdefault,int):
262  raise TypeError(
263  'The "rankdefault" argument to CoupledWRF.couple must be '
264  'None or an int. You passed a %s %s.'
265  %(type(rankdefault).__name__,repr(rankdefault)))
266  if not isinstance(exe,basestring):
267  raise TypeError(
268  'The "exe" argument to CoupledWRF.couple must be a string. '
269  'You plassed a %s %s.'%(type(exe).__name__,repr(exe)))
270  if not isinstance(rankname,basestring):
271  raise TypeError(
272  'The "rankname" argument to CoupledWRF.couple must be '
273  'a string. You plassed a %s %s.'
274  %(type(rankname).__name__,repr(rankname)))
275  rd=None if (rankdefault is None) else str(rankdefault)
276  if name not in self.__components:
277  self.__order.append(name)
278  c=Component(name,exe,rankname,rankdefault,initer)
279  c._order=self.order(name)
280  self.__components[name]=c
281  return self
282 
283  def remove_wave(): pass
284  def remove_ocean(): pass
285 
286  def uncouple(self,which=None):
287  """!Removes a component, or all components, from the coupling.
288  @param which the name of the component to remove. If None or
289  unspecified, run totally uncoupled.
290  @returns self"""
291  if which is None:
292  # Uncouple everything
293  self._default_coupling=False
294  return self
295  del self.__components[which]
296  order=list()
297  for w in self.__order:
298  if w!=which:
299  order.append(w)
300  self.__order=order
301  for c in self.coupleiter():
302  c._order=self.order(c.name)
303  if len(order) == 2:
304  # Special case: only coupler and wrf are left.
305  self.uncouple()
306  return self
307 
308  def component(self,which):
309  """Returns information about the component with the specified
310  name (a string) or at the specified zero-based index (an
311  int). Returns a mutable object of class Component."""
312  if isinstance(which,basestring):
313  return self.__components[which]
314  elif isinstance(which,int):
315  name=self.__order[which]
316  return self.__components[name]
317  else:
318  raise TypeError('CoupledWRF.component requires an int or string '
319  'argument. You passed a %s %s.'%
320  (type(which).__name__,repr(which)))
321  def order(self,name):
322  """Components of coupling are ordered by where they show up in
323  the MPI execution. The NCEP coupler must be first. This
324  function returns the zero-based order. Hence, if one does:
325 
326  self.couple("coupler",...).couple("hycom",...).couple("wrf",...)
327 
328  then:
329 
330  self.order("coupler") = 0
331  self.order("hycom") = 1
332  self.order("wrf") = 2
333 
334  Other values will raise KeyError."""
335  i=-1
336  for o in self.__order:
337  i+=1
338  if o==name: return i
339  raise KeyError(name)
340  ####################################################################
341  # Products
342  def add_coupled_stream(self,stream,times):
343  """Adds a new non-WRF output stream."""
344  if stream in ('output','history','restart','restartin','input',
345  'inputout') or stream.find('auxhist')==0 or \
346  stream.find('auxinput')==0:
347  raise ValueError(
348  'Component stream name (%s) cannot be the same as any WRF '
349  'stream name: output, history, restart, restartin, input, '
350  'inputout, auxhist* or auxinput*.'%(stream,))
351  if stream not in self._coupled_products:
352  a=self.sim.simstart()
353  timerel=[to_datetime_rel(t,a) for t in times]
354  self._coupled_products[stream]=TimeMapping(timerel,list)
355  return self
356  def add_coupled_product(self,stream,time,product):
357  """Adds a product for a non-WRF output stream at a given time."""
358  self._coupled_products[stream][time].append(product)
359  return self
360  def coupled_products(self,stream=None,time=None):
361  """Iterates over non-WRF products."""
362  if stream is None:
363  if time is None:
364  for (s,tprod) in self._coupled_products.iteritems():
365  for (t,prods) in tprod.iteritems():
366  for p in prods: yield p
367  else:
368  for (s,tprod) in self._coupled_products.iteritems():
369  prods=tprod.get(time,None)
370  if prods is None: continue
371  for p in prods: yield p
372  elif stream in self._coupled_products:
373  tprod=self._coupled_products[stream]
374  if time is None:
375  for (t,prods) in tprod.iteritems():
376  for p in prods: yield p
377  else:
378  prods=tprod.get(time,None)
379  if prods is not None:
380  for p in prods: yield p
381 
382  def products(self,domains=None,stream=None,time=None,**kwargs):
383  """Iterates over all products, both WRF and non-WRF."""
384  if stream is None:
385  coupled=True
386  wrf=True
387  else:
388  coupled=stream in self._coupled_products
389  wrf=not coupled
390 
391  if wrf:
392  for p in super(CoupledWRF,self).products(
393  domains=domains,stream=stream,time=time,**kwargs):
394  yield p
395  if coupled:
396  for p in self.coupled_products(stream,time):
397  yield p
398 
399 ########################################################################
400 class CouplingStatus(object):
401  """Maintains the ocean and wave status files in the COM directory."""
402  def __init__(self,conf,section,logger=None):
403  self.__conf=conf
404  self.__section=section
405  if not isinstance(section,basestring):
406  raise TypeError('The section argument to CouplingStatus.__init__ must be a string, not a %s %s.'%(type(section).__name__,repr(section)))
407  if logger is None:
408  logger=conf.log(section)
409  self.__logger=logger
410  @property
411  def logger(self):
412  """The logging.Logger to use for logging messages."""
413  return self.__logger
414  @property
415  def conf(self):
416  """The configuration object, a subclass of hwrf.config.HWRFConfig"""
417  return self.__conf
418  @property
419  def section(self):
420  """The section in self.conf to use for configuration information."""
421  return self.__section
422  def unset(self,logger=None):
423  """Delete the coupling status files. If the logger is
424  not specified, the section name is used for the logging domain"""
425  if logger is None: logger=self.logger
426  for ocstat in ( self.section, self.section+'2' ):
427  ocstat=self.conf.getstr(self.section,ocstat)
428  ocstatfile=os.path.join(self.conf.getdir('com'),ocstat)
429  produtil.fileop.remove_file(ocstatfile,info=True,logger=logger)
430  def fileiter(self):
431  yield self.conf.strinterp(self.section,'{'+self.section+'}')
432  yield self.conf.strinterp(self.section,'{'+self.section+'2}')
433  def set(self,coupling_flag,logger=None,morelines=None):
434  """Set RUN_COUPLED=YES (true) or =NO (false) depending on the
435  value of coupled_flag. If the logger is not specified, the
436  section name is used as the logging domain."""
437  if logger is None: logger=self.log()
438  coupling_flag=bool(coupling_flag)
439  for ocstatfile in self.fileiter():
440  strflag='YES' if coupling_flag else 'NO'
441  logger.info('Setting RUN_COUPLED=%s in status file %s'%(
442  strflag,ocstatfile))
443  with open(ocstatfile,'wt') as f:
444  f.write('RUN_COUPLED='+strflag+"\n")
445  if morelines is not None:
446  for line in morelines:
447  logger.info('%s: write %s'%(ocstatfile,line))
448  f.write(line+'\n')
449  def read(self,logger=None):
450  """Reads the first coupling status file (identified by {section}=)
451  and returns the contents as an array of lines."""
452  if logger is None: logger=self.logger
453  ocstatfile=self.conf.strinterp(self.section,'{'+self.section+'}')
454  if not os.path.exists(ocstatfile):
455  return list()
456  with open(ocstatfile,'rt') as f:
457  lines=f.readlines(2**20)
458  return lines
459 
460  def get(self,logger=None):
461  """Checks the coupling status file. If the file does not exist or
462  cannot be opened or read, then False is returned. Otherwise, the
463  file is scanned for RUN_COUPLED=YES or RUN_COUPLED=NO (case
464  insensitive). The last of those RUN_COUPLED lines is used:
465  NO=return False, YES=return True. If the logger is not specified,
466  the section name is used as the logging domain"""
467  if logger is None: logger=self.logger
468  ocstatfile=self.conf.strinterp(self.section,'{'+self.section+'}')
469  success=None
470 
471  logger.info('%s: scan status file for RUN_COUPLED=YES or NO'%(
472  ocstatfile))
473 
474  try:
475  with open(ocstatfile,'rt') as f:
476  for line in f:
477  if line.upper().find('RUN_COUPLED=YES')>=0:
478  success=True
479  logger.info(
480  'Status file says: RUN_COUPLED=YES')
481  elif line.upper().find('RUN_COUPLED=NO')>=0:
482  success=False
483  logger.warning(
484  'Status file says: RUN_COUPLED=NO')
485  except EnvironmentError as e:
486  logger.error(
487  'Error checking status file: %s'
488  %(str(e),),exc_info=True)
489  except Exception as ee:
490  logger.error(
491  'Unhandled exception while checking status file: %s'
492  %(str(ee),),exc_info=True)
493  raise
494 
495  if success is None:
496  logger.warning('Could not scan status file for RUN_COUPLED=YES'
497  ' or NO. Assuming RUN_COUPLED=NO.')
498  success=False
499  elif success:
500  logger.info(
501  'RUN_COUPLED=YES: status file says coupled component init succeeded.')
502  else:
503  logger.warning(
504  'RUN_COUPLED=NO: status file says coupled component init failed.')
505 
506  return success
507 
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
Definition: hwrftask.py:319
Generates a Fortran namelist entirely from config files.
Definition: namelist.py:411
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
def remove_file
Deletes the specified file.
Definition: fileop.py:251
conf
This HWRFTask's hwrf.config.HWRFConfig object.
Definition: hwrftask.py:415
def component(self, which)
Definition: coupling.py:308
def add_coupled_stream(self, stream, times)
Products.
Definition: coupling.py:342
section
The confsection in self.section for this HWRFTask (read-only)
Definition: hwrftask.py:422
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
def make_exe(self, task, exe, ranks)
Definition: coupling.py:65
Runs the real_nmm or wrf executables.
Definition: fcsttask.py:1
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
def check_coupled_inputs(self, logger)
Definition: coupling.py:47
The uncoupled HWRF forecast Task.
Definition: fcsttask.py:1544
Time manipulation and other numerical routines.
Definition: numerics.py:1
This module allows querying resource usage and limits, as well as setting resource limits...
Definition: rusage.py:1
def confint
Alias for self.conf.getint for section self.section.
Definition: hwrftask.py:248
def log
Obtain a logging domain.
Definition: hwrftask.py:425
Raised when the ocean init did not produce some expected outputs.
Definition: exceptions.py:118
Configures logging.
Definition: log.py:1
def coupleiter(self)
Component manipulation.
Definition: coupling.py:226
def products(self, domains=None, stream=None, time=None, kwargs)
Definition: coupling.py:382
def __init__(self, name, exe, rankname, rankdefault, initer)
Definition: coupling.py:26
def order(self, name)
Definition: coupling.py:321
Maps from an ordered list of times to arbitrary data.
Definition: numerics.py:746
def uncouple
Removes a component, or all components, from the coupling.
Definition: coupling.py:286
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
def add_coupled_product(self, stream, time, product)
Definition: coupling.py:356
def link_coupled_inputs(self, just_check, logger)
Definition: coupling.py:53