HWRF  trunk@4391
prep.py
1 """!PrepHybrid runs the prep_hybrid program to transform GFS spectral files to the HWRF grid."""
2 
3 ##@var __all__
4 # The list of symbols exported by "from hwrf.prep import *"
5 __all__=['PrepHybrid']
6 
7 import os
10 import hwrf.input
11 import produtil.rusage
12 
13 from produtil.rusage import setrlimit, rusage, getrlimit
14 from hwrf.exceptions import NoGeogData
15 from hwrf.numerics import to_datetime, to_datetime_rel, to_timedelta, \
16  to_fraction, timedelta_epsilon, TimeMapping, partial_ordering, \
17  to_datetime_rel
18 from produtil.fileop import deliver_file, make_symlink, fortlink, realcwd, \
19  wait_for_files
20 from produtil.run import alias, bigexe, checkrun, openmp
21 from produtil.datastore import FileProduct
22 
24  """!Runs the prep_hybrid program on GFS spectral files.
25 
26  Tracks a sequence of GFS spectral files for a list of times. Runs
27  prep_hybrid on them, one by one. This can be done by calling
28  run(), which runs prep_hybrid for all of them before returning, or
29  runpart() which runs only one before returning. If self.realtime
30  is True, then this class will wait for data to appear."""
31  def __init__(self,dstore,conf,section,wrf,geogdata,atime=None,
32  ftime=None,taskname=None,times=None,inputs=None,
33  in_item=None,in_dataset=None,one_time=False,
34  in_anl_item=None,**kwargs):
35  """!PrepHybrid constructor.
36  @param dstore the produtil.datastore.Datastore database
37  @param conf the hwrf.config.HWRFConfig for config info
38  @param section the section to use in conf
39  @param wrf the hwrf.wrf.WRFSimultion being run
40  @param geogdata the geogrid data product
41  @param atime,ftime analysis and forecast times
42  @param taskname taskname in the dstore
43  @param times prep_hybrid input/output times
44  @param inputs the hwrf.input.DataCatalog with the input data
45  @param in_item,in_dataset Input item and dataset
46  @param in_anl_item alternate input item for analysis time
47  @param one_time If True, only one time is prepped but we lie
48  and say should be used for all times. This is used to trick
49  the HWRF into running with the GDAS 9hr forecast even though
50  a 12hr boundary condition would be needed
51  @param kwargs More keyword arguments are passed to hwrf.hwrftask.HWRFTask.__init__"""
52  if taskname is None: taskname=section
53 
54  super(PrepHybrid,self).__init__(dstore,conf,section,taskname,**kwargs)
55 
56  if times is None:
57  self.times=[ x for x in wrf.bdytimes() ]
58  else:
59  self.times=sorted(times)
60 
61  if inputs is None:
62  hd=self.confstr('catalog','hwrfdata')
63  inputs=hwrf.input.DataCatalog(self.conf,hd,wrf.simstart())
64  self._epsilon=timedelta_epsilon(
65  self.times,default=to_fraction(wrf.bdystep())/10)
66  if in_dataset is None: in_dataset=self.confstr('dataset')
67  self.in_dataset=in_dataset
68  if in_item is None: in_item=self.confstr('item')
69  if in_anl_item is None:
70  in_anl_item=self.confstr('anl_item','')
71  if in_anl_item=='':
72  in_anl_item=in_item
73  self.in_item=in_item
74  self.in_anl_item=in_anl_item
75  if atime is None: atime=wrf.simstart()
76  if ftime is None: ftime=atime
77  self.one_time=one_time
78  self.in_atime=to_datetime(atime)
79  self.in_ftime=to_datetime_rel(ftime,atime)
80  self.geogdata=geogdata
81  self.inputs=inputs
82  self.bdyprod=TimeMapping(self.times)
83  self.logprod=TimeMapping(self.times)
84  self.initprod=None
85  self.wrf=wrf
86 
87  self.make_products()
88  self.nl=hwrf.namelist.Conf2Namelist(conf, self.confstr('namelist'))
89  self.init_namelist()
90 
91  self._prep=None
92 
93  ##@var bdyprod
94  # Mapping from time to boundary output product
95 
96  ##@var initprod
97  # Prep initial state product.
98 
99  ##@var nl
100  # hwrf.namelist.Conf2Namelist with the prep_hybrid namelist
101 
102  ##@var logprod
103  # Mapping from time to prep.log log file product
104 
105  ##@var wrf
106  #the hwrf.wrf.WRFSimultion being run
107 
108  ##@var geogdata
109  #the geogrid data product
110 
111  ##@var in_atime
112  # Analysis time.
113 
114  ##@var in_ftime
115  #Forecast time.
116 
117  ##@var times
118  #prep_hybrid input/output times
119 
120  ##@var inputs
121  # the hwrf.input.DataCatalog with the input data
122 
123  ##@var in_item
124  #hwrf.input.DataCatalog input item
125 
126  ##@var in_dataset
127  #hwrf.input.DataCatalog Input dataset
128 
129  ##@var in_anl_item
130  #alternate input item for analysis time
131 
132  ##@var one_time
133  #If True, only one time is prepped but we lie
134  # and say should be used for all times. This is used to trick
135  # the HWRF into running with the GDAS 9hr forecast even though
136  # a 12hr boundary condition would be needed
137 
138  def inputiter(self):
139  """!Iterates over the list of data needed to run this class."""
140  atime=self.in_atime
141  for t in self.times:
142  if hwrf.numerics.within_dt_epsilon(t,atime,5):
143  in_item=self.in_anl_item
144  else:
145  in_item=self.in_item
146  yield dict(self.taskvars,dataset=self.in_dataset,
147  item=in_item,ftime=t,atime=atime)
148  if self.one_time: break
149  def input_at(self,when):
150  """!Finds the input needed for the specified time.
151 
152  @param when the time of interest
153  @returns the spectral file at that time."""
154  logger=self.log()
155  ftime=to_datetime_rel(self.in_ftime,self.in_atime)
156  if self.one_time:
157  when=ftime
158  else:
159  when=to_datetime_rel(when,ftime)
160  atime=self.in_atime
161  if hwrf.numerics.within_dt_epsilon(when,atime,5):
162  logger.info('Within dt epsilon: %s %s'%(when,atime))
163  in_item=self.in_anl_item
164  else:
165  logger.info('NOT within dt epsilon: %s %s'%(when,atime))
166  in_item=self.in_item
167 
168  self.log().info(
169  "Check for dataset=%s item=%s ftime=%s atime=%s in %s"
170  %(str(self.in_dataset), str(in_item),
171  when.strftime("%Y%m%d%H"),
172  atime.strftime("%Y%m%d%H"), repr(self.inputs) ))
173  return self.inputs.locate(self.in_dataset,in_item,ftime=when,
174  atime=self.in_atime,**self.taskvars)
175  def io_form_geogrid(self):
176  """!Guesses the WRF I/O form that was used to run geogrid
177  @returns the io_form for auxinput2"""
178  return self.wrf.io_form_for('auxinput2')
179  def init_namelist(self):
180  """!Constructs the prep_hybrid namelist in the Conf2Namelist
181  object in self.nl"""
182  # Alias some functions for convenience:
183  siu=self.nl.nl_set_if_unset # set my namelist var if not yet set
184  wg=self.wrf.nl.nl_get # get WRF namelist variable
185  wtg=self.wrf.nl.trait_get # get WRF trait
186  # Set or override several parameters:
187  siu('prmfld','ntimes',1)
188  siu('domain','levels',wg('domains','eta_levels'))
189  siu('domain','ptsgm',float(wg('domains','ptsgm')))
190  siu('domain','p_top_requested',float(wtg('ptop'))/100.0) # Pa->hPa
191  def make_products(self):
192  """!Creates products objects for later file delivery.
193 
194  Creates the produtil.datastore.FileProduct objects for
195  delivery of prep_hybrid output."""
196  first=True
197  outdir=self.outdir
198  ds=self.dstore
199  cat=self.outdir
200  self.initprod=FileProduct(ds,'hwrfinit_0',cat,
201  location=os.path.join(outdir,'hwrfinit_0'))
202  for itime in xrange(len(self.times)):
203  time=self.times[itime]
204  assert(time is not None)
205  self.bdyprod[time]=FileProduct(ds,'hwrfbcs_%d'%(itime,),cat,
206  location=os.path.join(outdir,'hwrfbcs_%d'%(itime,)))
207  self.logprod[time]=os.path.join(outdir,'prep_%d.log'%(itime,))
208  def prep_init_file(self):
209  """!Return the FileProduct for the prep_hybrid initial time output file."""
210  return self.initprod
211  def prep_bdy_files(self):
212  """!Iterates over the FileProduct for the prep_hybrid boundary output files."""
213  for prod in self.bdyprod.itervalues():
214  yield prod
215  def products(self,time=None,name=None):
216  """!Iterates over all output products.
217 
218  Iterates over all products meeting the requirements.
219  @param time the time of interest, or None for all times
220  @param name "init" for initial state, "bdy" for boundary
221  information, or None for all."""
222  if name is None or name=='init':
223  dt=to_timedelta(abs(to_fraction(time-self.wrf.simstart())))
224  if time is None or dt<self._epsilon:
225  yield self.initprod
226  if name is None or name=='bdy':
227  bdys=self.bdyprod
228  if time is None:
229  for (time,prod) in bdys.iteritems(): yield prod
230  else:
231  when=bdys.neartime(time,self._epsilon)
232  if when in bdys: yield bdys[when]
233  def unrun(self):
234  """!Marks all products as unavailable, but does not delete
235  delivered files."""
236  for p in self.products():
237  p.available=0
238  def clean(self):
239  """!Deletes all temporary files (the self.workdir)."""
240  self.rmtree(os.path.join(self.workdir))
241  def run(self,keep=True):
242  """!Preps all times, even if they have already been prepped.
243  @param keep if True, copy output files instead of moving them
244  to the destination."""
245  logger=self.log()
246  self.postmsg('%s: prep is starting'%(self.taskname,))
247  for now in self.times:
248  logger.info('time %s: prep %s'%(now.strftime('%Y%m%d-%H%M%S'),
249  self.input_at(now)))
250  for ipiece in xrange(len(self.times)):
251  self.run_ipiece(ipiece,keep=keep)
252  self.postmsg('%s: prep is completed'%(self.taskname,))
253  def runpart(self,keep=True):
254  """!Preps one time that has not yet been prepped, and returns.
255  @param keep if True, copy output files instead of moving them
256  to the destination."""
257  logger=self.log()
258  for ipiece in xrange(len(self.times)):
259  now=self.times[ipiece]
260  if not self.bdyprod[now].available or ipiece==0 and \
261  not self.initprod.available:
262  logger.info(
263  'time %s (piece %d): not done; process this one'
264  %(now.strftime('%Y%m%d-%H%M%S'),ipiece))
265  try:
266  self.run_ipiece(ipiece,keep=keep)
267  return
268  except Exception as e:
269  logger.warning('time %s (piece %d): %s'%(
270  now.strftime('%Y%m%d-%H%M%S'),ipiece,str(e)),
271  exc_info=True)
272  raise
273  else:
274  self.log().info('time %s (piece %d): already complete'
275  %(now.strftime('%Y%m%d-%H%M%S'),ipiece))
276  def run_ipiece(self,ipiece,keep=True):
277  """!This is the internal helper function that runs the
278  prep_hybrid for self.run and self.runpart.
279  @param ipiece the index of the output time to run
280  @param keep if True, copy output files instead of moving them
281  to the destination."""
282  logger=self.log() # avoid many function calls
283  cenla=self.conf.getfloat('config','domlat')
284  cenlo=self.conf.getfloat('config','domlon')
285  now=self.times[ipiece]
286  prepme=self.input_at(now)
287  assert(isinstance(prepme,basestring))
288  if self.realtime and not wait_for_files(
289  prepme,logger,
290  maxwait=self.confint('max_spectral_wait',1800),
291  sleeptime=self.confint('spectral_sleep_time',20),
292  min_size=self.confint('min_spectral_size',int(4e7)),
293  min_mtime_age=self.confint('min_spectral_age',30),
294  min_atime_age=None,
295  min_ctime_age=None,
296  min_fraction=1.0):
297  msg='Some input spectral files do not exist. Giving up.'
298  logger.error(msg)
300 
301  stime=now.strftime('%Y%m%d-%H%M%S')
302  geoloc=self.geogdata.location
303  if not self.geogdata.available:
304  raise NoGeogData('%s product: WPS geogrid data is not yet '
305  'available. (loc=%s)'%(
306  str(self.geogdata.did), repr(self.geogdata.location)))
307  moad=self.wrf.get_moad()
308  # Decide where to run:
309  there=self.conftimestrinterp('{workdir}/{fYMDH}',
310  ftime=now,workdir=self.workdir)
311  with produtil.cd.NamedDir(there,keep=keep,logger=logger):
312  logger.info('%s: prep in directory %s',stime,realcwd())
313  make_symlink(self.geogdata.location,'geogrid.out',
314  logger=logger,force=True)
315  with open('dloc','wt') as f: f.write("%f\n%f\n"%(cenla,cenlo))
316  with open('itime','wt') as f: f.write("%d\n"%(ipiece,))
317  bdyfile='../hwrfbcs00_%d'%(ipiece,)
318  fortlink({52:bdyfile,
319  11:prepme,
320  44:'itime',
321  45:'dloc', },logger=logger,force=True)
322  if ipiece==0:
323  initfile='../hwrfinit_%d'%(ipiece,)
324  fortlink({53:initfile},logger=logger,force=True)
325  with open('hwrf_na12_mkbnd.parm','wt') as f:
326  f.write(self.nl.make_namelist(section_sorter=partial_ordering(
327  ['rgrid','prmfld','domain'])))
328  imax=self.confint('imax',1440)
329  jmax=self.confint('jmax',721)
330  iof=self.io_form_geogrid()%100
331  if iof==11: iof=2
332  ex=( bigexe(self.getexe('hwrf_prep')).env(
333  IO_FORM=iof,
334  OMP_STACKSIZE="128M",
335  MKL_NUM_THREADS='1' )[
336  moad.nl.nl_get('domains','e_we'),
337  moad.nl.nl_get('domains','e_sn'),
338  moad.nl.nl_get('domains','e_vert'),
339  moad.nl.nl_get('domains','dx'),
340  moad.nl.nl_get('domains','dy'),
341  imax, jmax]
342  < 'hwrf_na12_mkbnd.parm' ) >= 'prep.log'
343  threads=self.confint('threads',0)
344  # Try to change stack limit to 6GB:
345  setrlimit(logger=logger,stack=6e9,ignore=True)
346  # Print all resource limits:
347  getrlimit(logger=logger)
348  # AND collect resource usage during prep:
349  with rusage(logger=logger):
350  if threads<1:
351  logger.info('Use automatic thread count.')
352  ex2=openmp(ex) # use default threads
353  else:
354  # Use specified threads
355  logger.info('Use %d threads'%(threads,))
356  ex2=openmp(ex,threads=threads)
357  MALLOC_CHECK_=self.confint('MALLOC_CHECK_',-999)
358  if MALLOC_CHECK_ != -999:
359  ex2.env(MALLOC_CHECK_=str(MALLOC_CHECK_))
360  checkrun(ex2,logger=logger)
361  if ipiece==0:
362  produtil.fileop.makedirs(os.path.dirname(
363  self.initprod.location))
364  self.initprod.deliver(
365  frominfo=initfile,logger=logger,keep=False)
366  self.bdyprod[now].deliver(
367  frominfo=bdyfile,logger=logger,keep=False)
368  deliver_file('prep.log',self.logprod[now],logger=logger,
369  keep=False)
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
Create namelists, monitor wrf simulations, generate filenames.
Definition: wrf.py:1
Generates a Fortran namelist entirely from config files.
Definition: namelist.py:411
def input_at(self, when)
Finds the input needed for the specified time.
Definition: prep.py:149
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
def __init__(self, dstore, conf, section, wrf, geogdata, atime=None, ftime=None, taskname=None, times=None, inputs=None, in_item=None, in_dataset=None, one_time=False, in_anl_item=None, kwargs)
PrepHybrid constructor.
Definition: prep.py:34
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
A subclass of Product that represents file delivery.
Definition: datastore.py:856
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
conf
This HWRFTask's hwrf.config.HWRFConfig object.
Definition: hwrftask.py:415
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
in_anl_item
alternate input item for analysis time
Definition: prep.py:74
initprod
Prep initial state product.
Definition: prep.py:84
Raised when the spectral input files to prep_hybrid do not exist after some specified amount of time...
Definition: exceptions.py:391
one_time
If True, only one time is prepped but we lie and say should be used for all times.
Definition: prep.py:77
def within_dt_epsilon(time1, time2, epsilon)
Returns True if time1 is within epsilon of time2, and False otherwise.
Definition: numerics.py:198
def io_form_geogrid(self)
Guesses the WRF I/O form that was used to run geogrid.
Definition: prep.py:175
bdyprod
Mapping from time to boundary output product.
Definition: prep.py:82
Implements the produtil.run: provides the object tree for representing shell commands.
Definition: prog.py:1
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
times
prep_hybrid input/output times
Definition: prep.py:57
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
def run_ipiece
This is the internal helper function that runs the prep_hybrid for self.run and self.runpart.
Definition: prep.py:276
def clean(self)
Deletes all temporary files (the self.workdir).
Definition: prep.py:238
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
Obtains input data needed by various subclasses of hwrf.hwrftask.HWRFTask.
Definition: input.py:1
def makedirs
Make a directory tree, working around filesystem bugs.
Definition: fileop.py:224
Time manipulation and other numerical routines.
Definition: numerics.py:1
inputs
the hwrf.input.DataCatalog with the input data
Definition: prep.py:81
This module allows querying resource usage and limits, as well as setting resource limits...
Definition: rusage.py:1
in_atime
Analysis time.
Definition: prep.py:78
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def confint
Alias for self.conf.getint for section self.section.
Definition: hwrftask.py:248
def run
Preps all times, even if they have already been prepped.
Definition: prep.py:241
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
Definition: namelist.py:1
def products
Iterates over all output products.
Definition: prep.py:215
def log
Obtain a logging domain.
Definition: hwrftask.py:425
def prep_bdy_files(self)
Iterates over the FileProduct for the prep_hybrid boundary output files.
Definition: prep.py:211
def conftimestrinterp(self, string, ftime, atime=None, section=None, kwargs)
Alias for self.timestr for backward comaptibility.
Definition: hwrftask.py:328
def init_namelist(self)
Constructs the prep_hybrid namelist in the Conf2Namelist object in self.nl.
Definition: prep.py:179
in_item
hwrf.input.DataCatalog input item
Definition: prep.py:73
logprod
Mapping from time to prep.log log file product.
Definition: prep.py:83
Provides the location of a file in an archive, on disk or on a remote server via sftp or ftp...
Definition: input.py:109
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def make_products(self)
Creates products objects for later file delivery.
Definition: prep.py:191
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
Definition: datastore.py:1084
geogdata
the geogrid data product
Definition: prep.py:80
def unrun(self)
Marks all products as unavailable, but does not delete delivered files.
Definition: prep.py:233
Runs the prep_hybrid program on GFS spectral files.
Definition: prep.py:23
def inputiter(self)
Iterates over the list of data needed to run this class.
Definition: prep.py:138
Raised when the prep_hybrid program cannot find WPS geogrid output data.
Definition: exceptions.py:388
def prep_init_file(self)
Return the FileProduct for the prep_hybrid initial time output file.
Definition: prep.py:208
def taskvars(self)
The dict of object-local values used for string substitution.
Definition: hwrftask.py:243
def runpart
Preps one time that has not yet been prepped, and returns.
Definition: prep.py:253
def realtime(self)
Is this job a real-time forecast job?
Definition: hwrftask.py:180
wrf
the hwrf.wrf.WRFSimultion being run
Definition: prep.py:85
in_dataset
hwrf.input.DataCatalog Input dataset
Definition: prep.py:67
nl
hwrf.namelist.Conf2Namelist with the prep_hybrid namelist
Definition: prep.py:88
in_ftime
Forecast time.
Definition: prep.py:79