HWRF  trunk@4391
1 """!PrepHybrid runs the prep_hybrid program to transform GFS spectral files to the HWRF grid."""
3 ##@var __all__
4 # The list of symbols exported by "from hwrf.prep import *"
5 __all__=['PrepHybrid']
7 import os
10 import hwrf.input
11 import produtil.rusage
13 from produtil.rusage import setrlimit, rusage, getrlimit
14 from hwrf.exceptions import NoGeogData
15 from hwrf.numerics import to_datetime, to_datetime_rel, to_timedelta, \
16  to_fraction, timedelta_epsilon, TimeMapping, partial_ordering, \
17  to_datetime_rel
18 from produtil.fileop import deliver_file, make_symlink, fortlink, realcwd, \
19  wait_for_files
20 from produtil.run import alias, bigexe, checkrun, openmp
21 from produtil.datastore import FileProduct
24  """!Runs the prep_hybrid program on GFS spectral files.
26  Tracks a sequence of GFS spectral files for a list of times. Runs
27  prep_hybrid on them, one by one. This can be done by calling
28  run(), which runs prep_hybrid for all of them before returning, or
29  runpart() which runs only one before returning. If self.realtime
30  is True, then this class will wait for data to appear."""
31  def __init__(self,dstore,conf,section,wrf,geogdata,atime=None,
32  ftime=None,taskname=None,times=None,inputs=None,
33  in_item=None,in_dataset=None,one_time=False,
34  in_anl_item=None,**kwargs):
35  """!PrepHybrid constructor.
36  @param dstore the produtil.datastore.Datastore database
37  @param conf the hwrf.config.HWRFConfig for config info
38  @param section the section to use in conf
39  @param wrf the hwrf.wrf.WRFSimultion being run
40  @param geogdata the geogrid data product
41  @param atime,ftime analysis and forecast times
42  @param taskname taskname in the dstore
43  @param times prep_hybrid input/output times
44  @param inputs the hwrf.input.DataCatalog with the input data
45  @param in_item,in_dataset Input item and dataset
46  @param in_anl_item alternate input item for analysis time
47  @param one_time If True, only one time is prepped but we lie
48  and say should be used for all times. This is used to trick
49  the HWRF into running with the GDAS 9hr forecast even though
50  a 12hr boundary condition would be needed
51  @param kwargs More keyword arguments are passed to hwrf.hwrftask.HWRFTask.__init__"""
52  if taskname is None: taskname=section
54  super(PrepHybrid,self).__init__(dstore,conf,section,taskname,**kwargs)
56  if times is None:
57  self.times=[ x for x in wrf.bdytimes() ]
58  else:
59  self.times=sorted(times)
61  if inputs is None:
62  hd=self.confstr('catalog','hwrfdata')
63  inputs=hwrf.input.DataCatalog(self.conf,hd,wrf.simstart())
64  self._epsilon=timedelta_epsilon(
65  self.times,default=to_fraction(wrf.bdystep())/10)
66  if in_dataset is None: in_dataset=self.confstr('dataset')
67  self.in_dataset=in_dataset
68  if in_item is None: in_item=self.confstr('item')
69  if in_anl_item is None:
70  in_anl_item=self.confstr('anl_item','')
71  if in_anl_item=='':
72  in_anl_item=in_item
73  self.in_item=in_item
74  self.in_anl_item=in_anl_item
75  if atime is None: atime=wrf.simstart()
76  if ftime is None: ftime=atime
77  self.one_time=one_time
78  self.in_atime=to_datetime(atime)
79  self.in_ftime=to_datetime_rel(ftime,atime)
80  self.geogdata=geogdata
81  self.inputs=inputs
82  self.bdyprod=TimeMapping(self.times)
83  self.logprod=TimeMapping(self.times)
84  self.initprod=None
85  self.wrf=wrf
87  self.make_products()
88  self.nl=hwrf.namelist.Conf2Namelist(conf, self.confstr('namelist'))
89  self.init_namelist()
91  self._prep=None
93  ##@var bdyprod
94  # Mapping from time to boundary output product
96  ##@var initprod
97  # Prep initial state product.
99  ##@var nl
100  # hwrf.namelist.Conf2Namelist with the prep_hybrid namelist
102  ##@var logprod
103  # Mapping from time to prep.log log file product
105  ##@var wrf
106  #the hwrf.wrf.WRFSimultion being run
108  ##@var geogdata
109  #the geogrid data product
111  ##@var in_atime
112  # Analysis time.
114  ##@var in_ftime
115  #Forecast time.
117  ##@var times
118  #prep_hybrid input/output times
120  ##@var inputs
121  # the hwrf.input.DataCatalog with the input data
123  ##@var in_item
124  #hwrf.input.DataCatalog input item
126  ##@var in_dataset
127  #hwrf.input.DataCatalog Input dataset
129  ##@var in_anl_item
130  #alternate input item for analysis time
132  ##@var one_time
133  #If True, only one time is prepped but we lie
134  # and say should be used for all times. This is used to trick
135  # the HWRF into running with the GDAS 9hr forecast even though
136  # a 12hr boundary condition would be needed
138  def inputiter(self):
139  """!Iterates over the list of data needed to run this class."""
140  atime=self.in_atime
141  for t in self.times:
142  if hwrf.numerics.within_dt_epsilon(t,atime,5):
143  in_item=self.in_anl_item
144  else:
145  in_item=self.in_item
146  yield dict(self.taskvars,dataset=self.in_dataset,
147  item=in_item,ftime=t,atime=atime)
148  if self.one_time: break
149  def input_at(self,when):
150  """!Finds the input needed for the specified time.
152  @param when the time of interest
153  @returns the spectral file at that time."""
154  logger=self.log()
155  ftime=to_datetime_rel(self.in_ftime,self.in_atime)
156  if self.one_time:
157  when=ftime
158  else:
159  when=to_datetime_rel(when,ftime)
160  atime=self.in_atime
161  if hwrf.numerics.within_dt_epsilon(when,atime,5):
162  logger.info('Within dt epsilon: %s %s'%(when,atime))
163  in_item=self.in_anl_item
164  else:
165  logger.info('NOT within dt epsilon: %s %s'%(when,atime))
166  in_item=self.in_item
168  self.log().info(
169  "Check for dataset=%s item=%s ftime=%s atime=%s in %s"
170  %(str(self.in_dataset), str(in_item),
171  when.strftime("%Y%m%d%H"),
172  atime.strftime("%Y%m%d%H"), repr(self.inputs) ))
173  return self.inputs.locate(self.in_dataset,in_item,ftime=when,
174  atime=self.in_atime,**self.taskvars)
175  def io_form_geogrid(self):
176  """!Guesses the WRF I/O form that was used to run geogrid
177  @returns the io_form for auxinput2"""
178  return self.wrf.io_form_for('auxinput2')
179  def init_namelist(self):
180  """!Constructs the prep_hybrid namelist in the Conf2Namelist
181  object in self.nl"""
182  # Alias some functions for convenience:
183  siu=self.nl.nl_set_if_unset # set my namelist var if not yet set
184  wg=self.wrf.nl.nl_get # get WRF namelist variable
185  wtg=self.wrf.nl.trait_get # get WRF trait
186  # Set or override several parameters:
187  siu('prmfld','ntimes',1)
188  siu('domain','levels',wg('domains','eta_levels'))
189  siu('domain','ptsgm',float(wg('domains','ptsgm')))
190  siu('domain','p_top_requested',float(wtg('ptop'))/100.0) # Pa->hPa
191  def make_products(self):
192  """!Creates products objects for later file delivery.
194  Creates the produtil.datastore.FileProduct objects for
195  delivery of prep_hybrid output."""
196  first=True
197  outdir=self.outdir
198  ds=self.dstore
199  cat=self.outdir
200  self.initprod=FileProduct(ds,'hwrfinit_0',cat,
201  location=os.path.join(outdir,'hwrfinit_0'))
202  for itime in xrange(len(self.times)):
203  time=self.times[itime]
204  assert(time is not None)
205  self.bdyprod[time]=FileProduct(ds,'hwrfbcs_%d'%(itime,),cat,
206  location=os.path.join(outdir,'hwrfbcs_%d'%(itime,)))
207  self.logprod[time]=os.path.join(outdir,'prep_%d.log'%(itime,))
208  def prep_init_file(self):
209  """!Return the FileProduct for the prep_hybrid initial time output file."""
210  return self.initprod
211  def prep_bdy_files(self):
212  """!Iterates over the FileProduct for the prep_hybrid boundary output files."""
213  for prod in self.bdyprod.itervalues():
214  yield prod
215  def products(self,time=None,name=None):
216  """!Iterates over all output products.
218  Iterates over all products meeting the requirements.
219  @param time the time of interest, or None for all times
220  @param name "init" for initial state, "bdy" for boundary
221  information, or None for all."""
222  if name is None or name=='init':
223  dt=to_timedelta(abs(to_fraction(time-self.wrf.simstart())))
224  if time is None or dt<self._epsilon:
225  yield self.initprod
226  if name is None or name=='bdy':
227  bdys=self.bdyprod
228  if time is None:
229  for (time,prod) in bdys.iteritems(): yield prod
230  else:
231  when=bdys.neartime(time,self._epsilon)
232  if when in bdys: yield bdys[when]
233  def unrun(self):
234  """!Marks all products as unavailable, but does not delete
235  delivered files."""
236  for p in self.products():
237  p.available=0
238  def clean(self):
239  """!Deletes all temporary files (the self.workdir)."""
240  self.rmtree(os.path.join(self.workdir))
241  def run(self,keep=True):
242  """!Preps all times, even if they have already been prepped.
243  @param keep if True, copy output files instead of moving them
244  to the destination."""
245  logger=self.log()
246  self.postmsg('%s: prep is starting'%(self.taskname,))
247  for now in self.times:
248  logger.info('time %s: prep %s'%(now.strftime('%Y%m%d-%H%M%S'),
249  self.input_at(now)))
250  for ipiece in xrange(len(self.times)):
251  self.run_ipiece(ipiece,keep=keep)
252  self.postmsg('%s: prep is completed'%(self.taskname,))
253  def runpart(self,keep=True):
254  """!Preps one time that has not yet been prepped, and returns.
255  @param keep if True, copy output files instead of moving them
256  to the destination."""
257  logger=self.log()
258  for ipiece in xrange(len(self.times)):
259  now=self.times[ipiece]
260  if not self.bdyprod[now].available or ipiece==0 and \
261  not self.initprod.available:
262  logger.info(
263  'time %s (piece %d): not done; process this one'
264  %(now.strftime('%Y%m%d-%H%M%S'),ipiece))
265  try:
266  self.run_ipiece(ipiece,keep=keep)
267  return
268  except Exception as e:
269  logger.warning('time %s (piece %d): %s'%(
270  now.strftime('%Y%m%d-%H%M%S'),ipiece,str(e)),
271  exc_info=True)
272  raise
273  else:
274  self.log().info('time %s (piece %d): already complete'
275  %(now.strftime('%Y%m%d-%H%M%S'),ipiece))
276  def run_ipiece(self,ipiece,keep=True):
277  """!This is the internal helper function that runs the
278  prep_hybrid for self.run and self.runpart.
279  @param ipiece the index of the output time to run
280  @param keep if True, copy output files instead of moving them
281  to the destination."""
282  logger=self.log() # avoid many function calls
283  cenla=self.conf.getfloat('config','domlat')
284  cenlo=self.conf.getfloat('config','domlon')
285  now=self.times[ipiece]
286  prepme=self.input_at(now)
287  assert(isinstance(prepme,basestring))
288  if self.realtime and not wait_for_files(
289  prepme,logger,
290  maxwait=self.confint('max_spectral_wait',1800),
291  sleeptime=self.confint('spectral_sleep_time',20),
292  min_size=self.confint('min_spectral_size',int(4e7)),
293  min_mtime_age=self.confint('min_spectral_age',30),
294  min_atime_age=None,
295  min_ctime_age=None,
296  min_fraction=1.0):
297  msg='Some input spectral files do not exist. Giving up.'
298  logger.error(msg)
301  stime=now.strftime('%Y%m%d-%H%M%S')
302  geoloc=self.geogdata.location
303  if not self.geogdata.available:
304  raise NoGeogData('%s product: WPS geogrid data is not yet '
305  'available. (loc=%s)'%(
306  str(self.geogdata.did), repr(self.geogdata.location)))
307  moad=self.wrf.get_moad()
308  # Decide where to run:
309  there=self.conftimestrinterp('{workdir}/{fYMDH}',
310  ftime=now,workdir=self.workdir)
311  with produtil.cd.NamedDir(there,keep=keep,logger=logger):
312  logger.info('%s: prep in directory %s',stime,realcwd())
313  make_symlink(self.geogdata.location,'geogrid.out',
314  logger=logger,force=True)
315  with open('dloc','wt') as f: f.write("%f\n%f\n"%(cenla,cenlo))
316  with open('itime','wt') as f: f.write("%d\n"%(ipiece,))
317  bdyfile='../hwrfbcs00_%d'%(ipiece,)
318  fortlink({52:bdyfile,
319  11:prepme,
320  44:'itime',
321  45:'dloc', },logger=logger,force=True)
322  if ipiece==0:
323  initfile='../hwrfinit_%d'%(ipiece,)
324  fortlink({53:initfile},logger=logger,force=True)
325  with open('hwrf_na12_mkbnd.parm','wt') as f:
326  f.write(self.nl.make_namelist(section_sorter=partial_ordering(
327  ['rgrid','prmfld','domain'])))
328  imax=self.confint('imax',1440)
329  jmax=self.confint('jmax',721)
330  iof=self.io_form_geogrid()%100
331  if iof==11: iof=2
332  ex=( bigexe(self.getexe('hwrf_prep')).env(
333  IO_FORM=iof,
334  OMP_STACKSIZE="128M",
335  MKL_NUM_THREADS='1' )[
336  moad.nl.nl_get('domains','e_we'),
337  moad.nl.nl_get('domains','e_sn'),
338  moad.nl.nl_get('domains','e_vert'),
339  moad.nl.nl_get('domains','dx'),
340  moad.nl.nl_get('domains','dy'),
341  imax, jmax]
342  < 'hwrf_na12_mkbnd.parm' ) >= 'prep.log'
343  threads=self.confint('threads',0)
344  # Try to change stack limit to 6GB:
345  setrlimit(logger=logger,stack=6e9,ignore=True)
346  # Print all resource limits:
347  getrlimit(logger=logger)
348  # AND collect resource usage during prep:
349  with rusage(logger=logger):
350  if threads<1:
351  logger.info('Use automatic thread count.')
352  ex2=openmp(ex) # use default threads
353  else:
354  # Use specified threads
355  logger.info('Use %d threads'%(threads,))
356  ex2=openmp(ex,threads=threads)
357  MALLOC_CHECK_=self.confint('MALLOC_CHECK_',-999)
358  if MALLOC_CHECK_ != -999:
359  ex2.env(MALLOC_CHECK_=str(MALLOC_CHECK_))
360  checkrun(ex2,logger=logger)
361  if ipiece==0:
362  produtil.fileop.makedirs(os.path.dirname(
363  self.initprod.location))
364  self.initprod.deliver(
365  frominfo=initfile,logger=logger,keep=False)
366  self.bdyprod[now].deliver(
367  frominfo=bdyfile,logger=logger,keep=False)
368  deliver_file('prep.log',self.logprod[now],logger=logger,
369  keep=False)
