HWRF  trunk@4391
gsi.py
1 """!Runs the GSI data assimilation on the HWRF system."""
2 
3 ##@var __all__
4 # The list of symbols exported by "from hwrf.gsi import *"
5 __all__ = ['GSI_DATA_TYPES', 'GSIBase', 'FGATGSI' ]
6 
7 import os, shutil, glob, gzip
11 import datetime
12 import produtil.rusage
13 
14 from produtil.rusage import setrlimit, rusage, getrlimit
15 from produtil.datastore import COMPLETED
16 from produtil.cd import NamedDir
17 from produtil.fileop import make_symlink, deliver_file, isnonempty, \
18  make_symlinks_in
19 from produtil.run import mpirun, mpi, openmp, checkrun, bigexe, runsync
20 from hwrf.numerics import to_datetime,to_datetime_rel, to_fraction, \
21  to_timedelta
22 from hwrf.exceptions import GSIInputError
23 
24 ##@var GSI_DATA_TYPES
25 # List of all known GSI datastypes
26 GSI_DATA_TYPES = [
27  "hirs2_n14", "msu_n14", "sndr_g08", "sndr_g11", "sndr_g12", "sndr_g13",
28  "sndr_g08_prep", "sndr_g11_prep", "sndr_g12_prep", "sndr_g13_prep",
29  "sndrd1_g11", "sndrd2_g11", "sndrd3_g11", "sndrd4_g11", "sndrd1_g12",
30  "sndrd2_g12", "sndrd3_g12", "sndrd4_g12", "sndrd1_g13", "sndrd2_g13",
31  "sndrd3_g13", "sndrd4_g13", "sndrd1_g14", "sndrd2_g14", "sndrd3_g14",
32  "sndrd4_g14", "sndrd1_g15", "sndrd2_g15", "sndrd3_g15", "sndrd4_g15",
33  "hirs3_n15", "hirs3_n16", "hirs3_n17", "amsua_n15", "amsua_n16",
34  "amsua_n17", "amsub_n15", "amsub_n16", "amsub_n17", "hsb_aqua",
35  "airs_aqua", "amsua_aqua", "imgr_g08", "imgr_g11", "imgr_g12",
36  "imgr_g14", "imgr_g15", "pcp_ssmi_dmsp", "pcp_tmi_trmm", "conv",
37  "sbuv2_n16", "sbuv2_n17", "sbuv2_n18", "sbuv2_n19", "gome_metop-a",
38  "omi_aura", "mls_aura", "ssmi_f13", "ssmi_f14", "ssmi_f15",
39  "hirs4_n18", "hirs4_metop-a", "amsua_n18", "amsua_metop-a", "mhs_n18",
40  "mhs_metop-a", "amsre_low_aqua", "amsre_mid_aqua", "amsre_hig_aqua",
41  "ssmis_f16", "ssmis_f17", "ssmis_f18", "ssmis_f19", "ssmis_f20",
42  "iasi_metop-a", "hirs4_n19", "amsua_n19", "mhs_n19", "seviri_m08",
43  "seviri_m09", "seviri_m10", "cris_npp", "atms_npp", "hirs4_metop-b",
44  "amsua_metop-b", "mhs_metop-b", "iasi_metop-b", "gome_metop-b" ]
45 
47  """!Base class of anything that runs the GSI. Do not use directly."""
48 
49  ##@var used_regional_ensemble
50  # Was the regional ensemble used for forecast error covariances?
51 
52  ##@var global_ensemble_size
53  # The number of members in the global (parent model) ensemble
54 
55  ##@var hybrid_da
56  # Was hybrid 3DVAR-ENKF data assimilation used?
57 
58  def __init__(self,dstore,conf,section,domain,wrf_in_prod,sim,
59  taskname=None,atime=None,parent_atime=None,
60  enkf_domains=None,ensda=None,**kwargs):
61  """!The GSIBase constructor:
62  @param dstore passed to Datum: the Datastore object for this Task
63  @param conf the conf object for this task (passed to HWRFTask)
64  @param section the conf section for this task (passed to HWRFTask)
65  @param domain the WRFDomain for this GSI. Must have been
66  initialized by a WRFSimulation
67  @param wrf_in_prod the Product for the wrfinput_d01 or
68  ghost_d0* file for that domain
69  @param sim the hwrf.wrf.WRFSimulation that will be run as the forecast
70  @param taskname Optional: the taskname for this product in the datastore
71  @param atime the analysis time as a datetime.datetime.
72  Default: conf.cycle
73  @param parent_atime the analysis time of the parent model. This is
74  relevant if the parent model's forecast is used as the
75  background.
76  @param enkf_domains a list of WRF domains that should be copied
77  from the hwrf.ensda.EnsembleDA.
78  @param ensda a subclass of hwrf.ensda.DAEnsemble that provides
79  regional ensemble forecasts to generate the forecast error
80  covariance
81  @param kwargs ignored; passed to HWRFTask"""
82  super(GSIBase,self).__init__(dstore,conf,section,taskname=taskname,
83  **kwargs)
84  taskname=self.taskname
85  self._ensda=ensda
86  if atime is None: atime=conf.cycle
87  if parent_atime is None:
88  parent_atime=to_datetime_rel(-6*3600,atime)
89  self._atime=to_datetime(atime)
90  self._parent_atime=to_datetime(parent_atime)
91  self._domain=domain
92  self._wrf_in_prod=wrf_in_prod
93  self._sim=sim
94  self._enkf_domains=list()
95  self._tdrflagfile=os.path.join(self.conf.getdir('com'), \
96  self.icstr('{stormlabel}.tdr'))
99  self.hybrid_da=True
100  if enkf_domains is not None:
101  for domain in enkf_domains:
102  self._enkf_domains.append(domain)
103  assert(wrf_in_prod is not None)
104  assert(wrf_in_prod.location is not None)
105  assert(wrf_in_prod.location!='')
106 
107  # Generate the FileProduct object for our output file:
108  with dstore.transaction() as t:
109  prodname='gsi_out_'+self.domain.name
110  prod=produtil.datastore.FileProduct(dstore,prodname,taskname,
111  location=os.path.join(self.outdir,prodname))
112  self._wrf_out_prod=prod
113 
114  # Get the datasets and items for our data grabbers:
115  def getc(what,default):
116  if what in kwargs:
117  return str(kwargs[what])
118  s=self.confstr(what,'')
119  if s is not None and s!='':
120  return s
121  return default
122  #self._sigma_dataset=getc('sigma_dataset','gfs')
123  #self._sigma_item=getc('sigma_item','gfs_sf')
124  self._sigma_dataset=getc('sigma_dataset','gdas1')
125  self._sigma_item=getc('sigma_item','gdas1_sf')
126  self._bufr_dataset=getc('bufr_dataset','gfs')
127  self._bufr_item=getc('bufr_item',self._bufr_dataset+'_bufr')
128  self._prepbufr_item=getc('prepbufr_item','prepbufr_nr')
129  self._enkf_dataset=getc('enkf_dataset','enkf')
130  self._enkf_item=getc('enkf_item','enkf_sfg')
131  self._biascr_dataset=getc('biascr_dataset','gdas1')
132  self._biascr_item=getc('biascr_item','gdas1_biascr')
133  self._biascr_pc_item=getc('biascr_pc_item','gdas1_biascr_pc')
134  self._abias_item=getc('abias_item','gdas1_abias')
135  self._satang_item=getc('satang_item','gdas1_satang')
136 
137  # Get the DataCatalog for our data grabbers:
138  self._in_catalog=None
139  incat_name=None
140  if 'in_catalog' in kwargs:
141  ink=kwargs['in_catalog']
142  if isinstance(ink,hwrf.input.DataCatalog):
143  self._in_catalog=ink
144  elif isinstance(ink,basestring):
145  incat_name=ink
146  elif ink is None: pass
147  else:
148  raise TypeError(
149  'In hwrf.gsi.GSIBase.__init__, in_catalog must be None, '
150  'a basestring or a DataCatalog. You provided an object '
151  'of type %s, with value %s.'
152  %(type(ink).__name__,repr(ink)))
153  if self._in_catalog is None:
154  if incat_name is None:
155  incat_name=self.confstr('catalog')
157  self.conf,incat_name,self.parent_atime)
158 
159  @property
160  def wrf_top_Pa(self):
161  """!The WRF model top in pascals"""
162  p_top_requested=self._sim.nl.nl_get('domains','p_top_requested')
163  return float(p_top_requested)
164 
165  @property
166  def atime(self):
167  """!The analysis time of this GSI."""
168  return self._atime
169 
170  @property
171  def parent_atime(self):
172  """!Parent model analysis time.
173 
174  The analysis time of the parent data assimilation models
175  (ie.: GDAS, GFS ENKF). This is usually six hours before
176  self.atime."""
177  return self._parent_atime
178 
179  def set_ensda(self,ensda,enkf_domains):
180  """!Sets the hwrf.ensda.DAEnsemble to use, and the enkf domains.
181 
182  Specifies the ensemble to use for forecast error covariances,
183  and the list of hwrf.wrf.WRFDomain domains that should be
184  input to the GSI.
185  @param ensda the hwrf.ensda.DAEnsemble that provides short
186  simulation output, for the forecast error covariances
187  @param enkf_domains an iterable of enkf domains, WRFDomain
188  objects, identifying which domains in ensda should be used."""
189  self._enkf_domains=list()
190  self._ensda=ensda
191  for dom in enkf_domains:
192  self._enkf_domains.append(dom)
193 
194  def inputiter(self):
195  """!Iterate over needed inputs.
196 
197  Iterates over all files external to this workflow that are
198  required to run the GSI. This may include the GFS ENKF,
199  prepbufr, bufr, and other files. This is used by the
200  hwrf.input module to obtain those inputs."""
201  # Request GFS ENKF:
202  maxmemb=self.confint('num_enkf',80)
203  if maxmemb>0:
204  fhour=self.conffloat('enkf_fhr',6.0)
205  enkf_age=self.conffloat('enkf_age_hr',6.0)
206  my_atime=self.atime
207  atime=to_datetime_rel(-enkf_age*3600.0,my_atime)
208  ftime=to_datetime_rel(3600*fhour,atime)
209  for zmemb in xrange(maxmemb):
210  imemb=1+zmemb
211  yield dict(self.taskvars,dataset=self._enkf_dataset,
212  item=self._enkf_item,atime=atime,enkfmem=imemb,
213  ftime=ftime)
214 
215  # Request bias correction and/or satellite angle files:
216  parent_atime=self.parent_atime
217  if self.confbool('use_newradbc',False):
218  yield dict(self.taskvars,dataset=self._biascr_dataset,
219  item=self._biascr_item,ftime=parent_atime,
220  atime=parent_atime)
221  yield dict(self.taskvars,dataset=self._biascr_dataset,
222  item=self._biascr_pc_item,ftime=parent_atime,
223  atime=parent_atime)
224  else:
225  if self._abias_item:
226  yield dict(self.taskvars,dataset=self._biascr_dataset,
227  item=self._abias_item,ftime=parent_atime,
228  atime=parent_atime)
229  if self._satang_item:
230  yield dict(self.taskvars,dataset=self._biascr_dataset,
231  item=self._satang_item,ftime=parent_atime,
232  atime=parent_atime)
233 
234  # Request prepbufr file:
235  atime=self.atime
236  yield dict(self.taskvars,dataset=self._bufr_dataset,
237  item=self._prepbufr_item,atime=atime)
238 
239  # Request bufr files:
240  olist=self.confstr('obstypes')
241  touched=set()
242  for section in olist.split(','):
243  trim=section.strip()
244  if len(trim)<=0 or trim in touched: continue
245  dataset=self.conf.get(section,'dataset')
246  item=self.conf.get(section,'item')
247  otype=self.conf.get(section,'type').lower()
248  if otype=='satellite' and not self.confbool('sat_radiance_da',True):
249  continue
250  if otype=='satwnd' and not self.confbool('sat_wnd_da',True):
251  continue
252  for (localname,obstype) in self.conf.items(section):
253  if localname in ['dataset','item','type']:
254  # These are not obs types, so skip them.
255  continue
256  yield dict(self.taskvars,dataset=dataset,item=item,
257  obstype=obstype,atime=atime,optional=True)
258  # NOTE: optional=True, so the exhwrf_input will
259  # continue if a bufr file is missing.
260 
261  # GDAS data:
262  atime=self.parent_atime
263  for hr in self.parent_fhrs():
264  ftime=atime+datetime.timedelta(hours=hr)
265  yield dict(self.taskvars,dataset=self._sigma_dataset,
266  item=self._sigma_item,atime=atime,ftime=ftime)
267 
268  def grab_enkf_input(self):
269  """!Link or copy ensemble inputs.
270 
271  Links or copies the output of the last ENKF or hwrf.ensda
272  cycle. Calls grab_wrf_enkf() if any enkf_domains were given
273  to the constructor. Otherwise, calls grab_gfs_enkf() to get
274  the global enkf. Will also revert to grab_gfs_enkf() if the
275  ensda should be used, but was unavailable."""
276  self.used_regional_ensemble=False
277  use_hwrf_ensemble=self.confbool('use_hwrf_ensemble',False)
278  ensda_when=self.confstr('ensda_when','tdr_next_cycle').lower()
279  if use_hwrf_ensemble and len(self._enkf_domains)>0 and \
280  self._ensda is not None and (isnonempty(self._tdrflagfile)
281  or ensda_when=='always'):
283  if not self.used_regional_ensemble:
284  self.grab_gfs_enkf()
285 
286  def grab_bufr(self,atime=None,morevars=None):
287  """!Link bufr files.
288 
289  Links or copies all needed bufr files to the local directory.
290  If sat_da is False, satellite obs will be omitted.
291  @param atime the analysis time to use when specifying the
292  required bufr files
293  @param morevars additional variables to pass for string
294  replacement when expanding bufr filenames in
295  configuration (hwrf.config.HWRFConfig) sections."""
296  olist=self.confstr('obstypes')
297  touched=set()
298  for osection in olist.split(','):
299  trim=osection.strip()
300  if len(trim)>0 and not trim in touched:
301  self.grab_obstype_section(trim,atime=atime,morevars=morevars)
302 
303  def grab_obstype_section(self,section,atime=None,morevars=None):
304  """!Copies or links observations.
305 
306  @param section the obstype section to read
307  @param atime the atime for string expansion when finding bufr files
308  @param morevars more variables for string expansion when
309  finding bufr files
310 
311  Copies or links observations specified in the obstype sections
312  of the configuration file to the current working directory.
313 
314  The section listed in self.section should contain an "obstype"
315  option, whose value is a comma separated list of section
316  names. This method reads every section in that list. For example,
317 
318  @code{.unformatted}
319  [gsi_d02]
320  catalog = {input_catalog}
321  ...
322  obstypes = hdob_obstype,sat_obstypes,tdr_new_obstype
323  ...
324  [sat_obstypes]
325  type=satellite
326  dataset=gfs
327  item=gfs_bufr
328  gsnd1bufr=goesfv
329  amsuabufr=1bamua
330  satwndbufr=satwnd
331  gpsrobufr=gpsro
332  ...
333  [hdob_obstype]
334  ...
335  [tdr_new_obstype]
336  ...
337  @endcode
338 
339  For each section, the option keys are the local directory
340  filenames expected by GSI, while the values are the data type
341  part of the operational filename (ie.: the satwind in
342  gfs.t12z.tm00.satwind.bufr_d). There are a few special keys:
343 
344  dataset - the name of the dataset for hwrf.input purposes
345  item - the name of the item for hwrf.input purposes
346  type - the type of observation: satellite, or anything else.
347  At present, only "satellite" has any special meaning.
348 
349  If the type is "satellite" then the entire section will be
350  skipped if sat_radiance_da=False in this task's config section.
351 
352  If the type is "satwnd" then the entire section will be
353  skipped if sat_wnd_da=False in this task's config section.
354 
355  Once the section is parsed, the files are all linked to this
356  directory. """
357  logger=self.log()
358  if not isinstance(section,basestring): section=str(section)
359 
360  if atime is None:
361  atime=self.atime
362  else:
363  atime=to_datetime_rel(atime,self.atime)
364 
365  dataset=self.conf.get(section,'dataset')
366  item=self.conf.get(section,'item')
367  otype=self.conf.get(section,'type').lower()
368 
369  logger.warning('process obs section %s with dataset=%s item=%s '
370  'type=%s'%(section,dataset,item,otype))
371 
372  if otype=='satellite':
373  if self.confbool('sat_radiance_da',True):
374  logger.warning('%s: satellite radiance DA '
375  'is enabled. Continuing...'%(section,))
376  else:
377  logger.warning('%s: satellite radiance DA is disabled in hwrf.conf. '
378  'Not linking bufr files from this section.'
379  %(section,))
380  return
381 
382  if otype=='satwnd':
383  if self.confbool('sat_wnd_da',True):
384  logger.warning('%s: assimilation of satellite wind '
385  'is enabled. Continuing...'%(section,))
386  else:
387  logger.warning('%s: assimilation of satellite wind is disabled in hwrf.conf. '
388  'Not linking bufr files from this section.'
389  %(section,))
390  return
391 
392  obstypes=list()
393  items=self.conf.items(section)
394  otdict=dict( [ (v,k) for k,v in items ] )
395  namer=lambda f,t: otdict[t]
396 
397  for localname,obstype in items:
398  if localname in ['dataset','item','type']: continue
399  obstypes.append(obstype)
400 
401  for obstype in obstypes:
402  logger.warning('Find obstype=%s in dataset=%s item=%s'
403  %(obstype,dataset,item))
404  if not isinstance(obstype,basestring):
405  raise TypeError(
406  'In gsi.GSIBase.link_bufr, the obstypes parameter must '
407  'be an iterable container of basestrings. One of the '
408  'elements was a %s (value %s) instead.'
409  %(type(obstype).__name__,repr(obstype)))
410  if dataset == 'tdr' and bool(self.realtime):
411  ds=os.path.join(self.getdir('intercom'),'bufrprep')
412  it=self._in_catalog.parse(item,atime=atime,
413  logger=logger,obstype='tldplr')
414  there=os.path.join(ds,it)
415  else:
416  there=self._in_catalog.locate(dataset,item,atime=atime,
417  logger=logger,obstype=obstype)
418  if there is None or there=='':
419  msg='%s: Could not find a location for this obstype.'\
420  %(obstype,)
421  logger.warning(msg)
422  if self.confbool('require_all_bufrs',False):
423  raise GSIInputError(msg)
424  elif produtil.fileop.isnonempty(there):
425  bn=os.path.basename(there)
426  on=namer(bn,obstype)
427  if dataset != 'tdr' or isnonempty(self._tdrflagfile):
428  make_symlink(there,on,logger=logger,force=True)
429  else:
430  msg='%s: Observation file is empty or non-existent: %s'\
431  %(obstype,there)
432  logger.warning(msg)
433  if self.confbool('require_all_bufrs',False):
434  raise GSIInputError(msg)
435 
436 
437  def grab_prepbufr(self,atime=None,**kwargs):
438  """!Links or copies the prepbufr file to the local directory.
439 
440  @param atime the analysis time, or time relative to
441  self.atime. Used for string expansion in the
442  hwrf.config.HWRFConfig.
443  @param kwargs also passed to the hwrf.config.HWRFConfig
444  for string expansion"""
445  if atime is None:
446  atime=self.atime
447  else:
448  atime=to_datetime_rel(atime,self.atime)
449  logger=self.log()
450  there=self._in_catalog.locate(self._bufr_dataset,self._prepbufr_item,
451  atime=atime,logger=logger,**kwargs)
452  if self.conf.getint('bufrprep','prepbufrprep',0) == 0:
453  there=self._in_catalog.locate(self._bufr_dataset,self._prepbufr_item,
454  atime=atime,logger=logger,**kwargs)
455  else:
456  ds=os.path.join(self.getdir('intercom'),'bufrprep')
457  it=self._in_catalog.parse(self._prepbufr_item,
458  atime=atime,logger=logger,**kwargs)
459  there=os.path.join(ds,it)
460  if there is None or there=='':
461  msg='Could not find the prepbufr file (item=%s dataset=%s)' \
462  %(repr(self.prepbufr_item),repr(self._bufr_dataset))
463  logger.warning(msg)
464  raise GSIInputError(msg)
465  elif not produtil.fileop.isnonempty(there):
466  msg=there+': is non-existent or empty'
467  logger.error(msg)
468  raise GSIInputError(msg)
469  make_symlink(there,'prepbufr',logger=logger,force=True)
470 
471  def write_vitals(self,filename='tcvitl'):
472  """!Writes the tcvitals (from self.storminfo) to the specified
473  file.
474  @param filename the file to receive the tcvitals"""
475  logger=self.log()
476  logger.info('Writing tcvitals to %s'%(repr(filename),))
477  with open(filename,'wt') as f:
478  f.write(self.storminfo.as_tcvitals()+"\n")
479  assert(os.path.exists(filename))
480 
481  def wrfout_copier(self,file):
482  """!Generate the wrfout file converter.
483 
484  Returns the "copier" argument to deliver_file to use to copy
485  the specified file. Will be None, unless the file is HDF5, in
486  which case it will be "ncks -6 source target" to decompress
487  and convert to NetCDF3 style (uncompressed) with 64-bit
488  indexing. If ncks is missing, such a conversion is
489  impossible, so None is returned.
490 
491  @param file the file that is to be copied"""
492  logger=self.log()
493  ncks=self.getexe('ncks','')
494  if not ncks:
495  logger.warning('ncks path not specified in conf [exe] '
496  'so I will search your $PATH')
497  ncks=produtil.fileop.find_exe('ncks',raise_missing=False)
498  if ncks:
499  def copy(s,t,x):
500  produtil.fileop.remove_file(t,logger=logger)
501  if self.confbool('sync_frequently',True):
502  runsync()
503  checkrun(bigexe(ncks)['-6',s,t]<'/dev/null',logger=logger)
504  return copy
505  else:
506  return None
507 
508  def grab_wrf_enkf(self,ensda):
509  """!Links the WRF ENKF files to this directory.
510  @param ensda the hwrf.ensda.DAEnsemble that provides the files"""
511  logger=self.log()
512  logger.info('in grab_wrf_enkf')
513  nameme=dict()
514  plist=list()
515  did=None
516  for (ens,member) in ensda.members_at_time(self.atime):
517  logger.info('ens,member = %s,%s'%(repr(ens),repr(member)))
518  iens=int(ens)
519  for domain in self._enkf_domains:
520  logger.info('ens,member,domain = %s,%s,%s'
521  %(repr(ens),repr(member),repr(domain)))
522  did=int(domain.get_grid_id())
523  if self.conf.getbool('config','run_ens_relocation'):
524  if domain.is_moad():
525  continue
526  else:
527  prod=member.rstage3.get_wrfout(domain=domain)
528  else:
529  if domain.is_moad():
530  continue
531  else:
532  prod=member.get_wrfanl(domain=domain,atime=self.atime)
533  if prod is None:
534  logger.info('No product for domain %s.'%(str(domain),))
535  continue
536  logger.info('domain %s prod %s'%(str(domain),prod.did))
537  nameme[prod]='wrf_en%03d'%iens
538  plist.append(prod)
539  assert(did is not None)
540  def renamer(p,l): return nameme[p]
541  def copy(fromfile,tofile,copier):
542  deliver_file(fromfile,tofile,logger=logger,keep=True,copier=copier)
543  def link(fromfile,tofile):
544  produtil.fileop.make_symlink(fromfile,tofile,logger=logger)
545 
546  workpool=None
547 
548  def actor(p,n,l):
549  fromfile=p.location
550  l.info('Link %s to %s'%(fromfile,n))
551  if produtil.fileop.netcdfver(fromfile)=='HDF5':
552  l.info('%s: file is HDF5. I will assume your GSI was built with support for compressed NetCDF4'%(fromfile,))
553  copier=None
554  else:
555  l.info('%s: file is NetCDF3.'%(fromfile,))
556  copier=None
557 
558  if copier is None:
559  workpool.add_work(link,[fromfile,n])
560  else:
561  workpool.add_work(copy,[fromfile,n,copier])
562 
563  maxtime=self.confint('maxwait',240)
564  if not self.realtime: maxtime=30
565 
566  with produtil.workpool.WorkPool(4) as wp:
567  workpool=wp
569  plist,logger,renamer,actor,maxtime=maxtime)
570  workpool.barrier()
571 
572  wanted=len(plist)
573  if count<wanted:
574  logger.critical('Found only %d of %d WRF ENKF files - '
575  'using GFS ENKF instead.'%(count,wanted))
576  return False
577  self.postmsg('Found all %d of %d WRF ENKF files.'%(count,wanted))
578  return True
579 
580  def grab_gfs_enkf(self,atime=None,**kwargs):
581  """!Links the GFS ENKF files to this directory
582  @param atime the analysis time, or time relative to
583  self.atime. Used for string expansion in the
584  hwrf.config.HWRFConfig.
585  @param kwargs also passed to the hwrf.config.HWRFConfig
586  for string expansion"""
587  logger=self.log()
588  maxmemb=self.confint('num_enkf',80)
589  if maxmemb==0:
590  logger.warning('Number of ENKF members requested is 0. Will '
591  'not link anything.')
592  return
593 
594  fhour=self.conffloat('enkf_fhr',6.0)
595  enkf_age=self.conffloat('enkf_age_hr',6.0)
596  my_atime=self.atime
597  assert(my_atime is not None)
598  atime=to_datetime_rel(-enkf_age*3600.0,my_atime)
599  ftime=to_datetime_rel(3600*fhour,atime)
600  with open('filelist06','wt') as fl:
601  localmemb=0
602  for zmemb in xrange(maxmemb):
603  imemb=1+zmemb
604  there=self._in_catalog.locate(self._enkf_dataset,
605  self._enkf_item,atime=atime,logger=logger,
606  enkfmem=imemb,ftime=ftime,**kwargs)
607  if not produtil.fileop.isnonempty(there):
608  if not bool(self.realtime):
609  raise GSIInputError('required input file '
610  'is empty or non-existent: %s'
611  %(there,))
612  else:
613  logger.warning(there+': is empty or non-existent')
614  else:
615  localmemb=localmemb+1
616  local=self.conftimestrinterp(
617  'sfg_{aYMDH}_fhr{fahr:02d}s_mem{imemb:03d}',
618  atime=atime,ftime=ftime,imemb=localmemb)
619  produtil.fileop.make_symlink(there,local,force=True,
620  logger=logger)
621  fl.write(local+'\n')
622  self.global_ensemble_size=localmemb
623  if localmemb < 40:
624  logger.info('Ensemble member is less than 40!!!'
625  'Will run 3DVAR instead of hybrid analysis')
626  self.hybrid_da=False
627  else:
628  self.hybrid_da=True
629 
630  def copy_wrf_inout(self,filename='wrf_inout'):
631  """!Copies the WRF analysis or input file to the specified
632  filename
633  @param filename the file to receive the data"""
634  logger=self.log()
635  def namer(p,logger,*args): return filename
636  def actor(p,name,logger,*args):
637  deliver_file(p.location,name,logger=logger)
639  logger,namer,actor)
640 
641  def get_ghost(self,domain):
642  """!Obtain output ghost product for the specified domain.
643 
644  If this GSI is being run on the specified domain, returns the
645  output product of GSI, otherwise returns None.
646  @param domain the WRFDomain of interest"""
647  if domain==self.domain:
648  return self._wrf_out_prod
649  return None
650 
651  def get_wrfanl(self,domain):
652  """!Obtain output ghost product for the specified domain.
653 
654  If this GSI is being run on the specified domain, returns
655  the output product of GSI, otherwise returns None.
656  @param domain the WRFDomain of interest"""
657  if domain==self.domain:
658  return self._wrf_out_prod
659  return None
660 
661  def get_wrfinput(self):
662  """!Obtain output wrfinput data for the outermost WRF domain.
663 
664  If this GSI is being run on the WRF outermost domain (Mother
665  Of All Domains, or MOAD), returns the output product of GSI.
666  Otherwise, returns None."""
667  if self.domain.is_moad():
668  return self._wrf_out_prod
669  else:
670  return None
671  @property
672  def domain(self):
673  """!The WRF domain for which GSI is being run."""
674  return self._domain
675 
676  def products(self,domains=None,prodtype='wrf_out_prod'):
677  """!Iterates over all output products of this Task.
678  @param domains the domains of interest
679  @param prodtype ignored"""
680  if domains is None:
681  yield self._wrf_out_prod
682  else:
683  for domain in domains:
684  if domain==self.domain:
685  yield self._wrf_in_prod
686  break
687 
688  def make_gsi_namelist(self,filename='gsiparm.anl'):
689  """!Creates the GSI namelist in the specified file.
690  @param filename the destination filename"""
691  logger=self.log()
692  invars=dict()
693 
694  use_gfs_stratosphere=self.confbool('use_gfs_stratosphere',True)
695  if self.wrf_top_Pa < 201.0 and self.confbool('sat_radiance_da',True):
696  invars.update( USE_GFS_STRATOSPHERE=use_gfs_stratosphere,
697  USE_GFS_OZONE=True,
698  REGIONAL_OZONE=False,
699  NDAT=80 )
700  else:
701  invars.update( USE_GFS_STRATOSPHERE=False,
702  USE_GFS_OZONE=False,
703  REGIONAL_OZONE=False,
704  NDAT=16 )
705 
706  if not self.confbool('use_newradbc',False):
707  SETUP='''upd_pred(1)=0,upd_pred(2)=0,upd_pred(3)=0,
708  upd_pred(4)=0,upd_pred(5)=0,upd_pred(6)=0,
709  upd_pred(7)=0,upd_pred(8)=0,upd_pred(9)=0,
710  upd_pred(10)=0,upd_pred(11)=0,upd_pred(12)=0,'''
711  else:
712  SETUP='''newpc4pred=.true., adp_anglebc=.true., angord=4,
713  passive_bc=.false., use_edges=.false., emiss_bc=.true.,
714  diag_precon=.true., step_start=1.e-3, upd_pred(1)=0,
715  upd_pred(2)=0,upd_pred(3)=0,upd_pred(4)=0,
716  upd_pred(5)=0,upd_pred(6)=0,upd_pred(7)=0,
717  upd_pred(8)=0,upd_pred(9)=0,upd_pred(10)=0,
718  upd_pred(11)=0,upd_pred(12)=0,'''
719 
720  dom=self._sim[self._domain]
721  nx=dom.nx
722  assert(nx)
723  ny=dom.ny
724  assert(ny)
725  nz=dom.nz
726  assert(nz)
727  invars.update(NLAT=ny-1, NLON=nx-1, NETCDF=True, LEVS=nz-1,
728  NLON_ENS_REGIONAL=nx-1,SETUP=SETUP,
729  NLAT_ENS_REGIONAL=ny-1)
730  invars.update(tdrtype='tldplrbufr')#FIXME: tdrtype
731  invars.update(gps_dtype='gps_bnd')
732 
733  singleobstest=self.confbool('singleobstest',False)
734  invars.update(ONEOBSTEST=singleobstest)
735 
736  if self.used_regional_ensemble:
737  logger.info('Using regional ensemble.')
738  invars.update( MERGE_TWO_GRID_ENSPERTS=False,
739  REGIONAL_ENSEMBLE_OPTION=2,
740  ENSEMBLE_SIZE_REGIONAL=self._ensda.nmembers)
741  else:
742  logger.info('Using global ensemble.')
743  invars.update( HYBENS_REGIONAL=self.hybrid_da,
744  ENSEMBLE_SIZE_REGIONAL=self.global_ensemble_size)
745 
746  nml_file=self.confstr('nml_file')
747  nml_section=self.confstr('nml_section',self.section)
748  atime=self.atime
749  ni=hwrf.namelist.NamelistInserter(self.conf,nml_section)
750 
751  with open(nml_file,'rt') as nf:
752  with open(filename,'wt') as of:
753  of.write(ni.parse(nf,logger=logger,source=nml_file,
754  raise_all=True,atime=atime,**invars))
755 
756  def after_gsi(self):
757  """!Called by run() after the gsi executable completes.
758 
759  This is intended to be overridden by subclasses to perform
760  some action after gsi is complete, but before products are
761  delivered. The default implementation does nothing."""
762 
763  def before_gsi(self):
764  """!Called by run() just before running the gsi program.
765 
766  This is intended to be overridden by subclasses to perform
767  some action after all inputs needed for gsi are available, but
768  before gsi starts. The default implementation does nothing."""
769 
770  def grab_more_inputs(self):
771  """!Called by run() to obtain additional inputs before before_gsi()
772 
773  This is intended to be overridden by subclasses to copy or
774  link more inputs for GSI. The default implementation does
775  nothing."""
776 
777  def grab_bias_satang(self):
778  """!Copies or links bias correction and satellite angle files"""
779  logger=self.log()
780  #parent_atime=self._atime
781  parent_atime=self.parent_atime
782  def get(ds,it,fn):
783  logger.info('%s: search for this at dataset=%s item=%s time=%s'
784  %(fn,ds,it,parent_atime.strftime('%Y%m%d%H')))
785  there=self._in_catalog.locate(ds,it,atime=parent_atime,
786  logger=logger,ftime=parent_atime)
787  logger.info('%s: found at %s'%(fn,there))
788  make_symlink(there,fn,force=True,logger=logger)
789  if self.confbool('use_newradbc',False):
790  get(self._biascr_dataset,self._biascr_item,'satbias_in')
791  get(self._biascr_dataset,self._biascr_pc_item,'satbias_pc')
792  else:
793  get(self._biascr_dataset,self._abias_item,'satbias_in')
794  get(self._biascr_dataset,self._satang_item,'satbias_angle')
795 
796  def run_gsi_exe(self):
797  """!Runs the actual GSI executable."""
798  logger=self.log()
799  if self.confbool('sync_frequently',True):
800  runsync()
801  cmd = (mpirun(mpi(self.getexe('gsi')),allranks=True)\
802  .env(OMP_STACKSIZE='128M') < 'gsiparm.anl')
803  # Redirection is mandatory for the GSI so we can process the
804  # output later on.
805  cmd = cmd > 'stdout'
806 
807  logger.warning(repr(cmd))
808  # Send a message at the highest logging level before and after
809  # GSI so the NCEP-wide jlogfile has a message about the status
810  # of GSI.
811  self.postmsg('Starting GSI for %s domain'%(str(self.domain),))
812 
813  sleeptime=self.conffloat('sleeptime',30.0)
814  ntries=0
815  maxtries=2
816  while ntries<maxtries:
817  try:
818  ntries+=1
819  getrlimit(logger=logger)
820  with rusage(logger=logger):
821  checkrun(cmd,sleeptime=sleeptime)
822  break
823  except Exception as e:
824  if ntries>=maxtries:
825  logger.critical('GSI failed for %s domain: %s'
826  %(str(self.domain),str(e)),exc_info=True)
827  raise
828  else:
829  logger.warning('GSI failed for %s domain: %s, will retry %d more time(s)'
830  %(str(self.domain),str(e),maxtries-ntries),
831  exc_info=True)
832  self.postmsg('GSI succeeded for %s domain'%(str(self.domain),))
833 
834  def deliver_products(self):
835  """!Delivers output products.
836 
837  This function is called by run() to deliver output files to
838  the intercom or com directory and record in the database that
839  they are delivered."""
840  d=os.path.dirname(self._wrf_out_prod.location)
841  logger=self.log()
842  produtil.fileop.makedirs(d,logger=logger)
843  self._wrf_out_prod.deliver(frominfo='wrf_inout',keep=False,
844  logger=logger)
846  'satbias_out',os.path.join(self.outdir,'satbias_out'),
847  keep=False,logger=logger)
848 
849  def make_diag_files(self,tgtpre,nthreads):
850  """!Creates GSI diagnostic files.
851 
852  Makes some diagnostic files and copies them to the specified
853  delivery location. Part of this routine is threaded: specify
854  the number of worker threads in nthreads. Minimum is 1.
855  @param tgtpre the prefix to the output names, including the full path
856  @param nthreads the maximum number of threads to use"""
857  assert(nthreads>=1)
858  assert(isinstance(tgtpre,basestring))
859  assert(os.path.isabs(tgtpre))
860 
861  logger=self.log()
862 
863  # This section makes the many diagnostic *.gz files in worker
864  # threads. The actual work is done in _make_diag_for.
865  logger.info('make diag *.gz files')
866  with produtil.workpool.WorkPool(nthreads,logger) as workers:
867 
868  # Request that the workers make the stdout.anl:
869  workers.add_work(self._make_stdout_anl,[tgtpre,logger])
870 
871  # Request that the workers deliver ensemble spread
872  workers.add_work(self._make_ensemble_spread,[tgtpre,logger])
873 
874  # Request that the workers make the diag*gz files for all
875  # data types:
876  for dtype in GSI_DATA_TYPES:
877  workers.add_work(self._make_diag_for,[tgtpre,dtype,logger])
878 
879  workers.barrier() # wait for work to complete
880 
881  def _make_stdout_anl(self,tgtpre,logger):
882  """! Concatenates many "stdout" files into one.
883 
884  Do not call this function directly; it is part of the
885  implementation of make_diag_files. It concatenates "stdout"
886  and fort.2* to a single log file.
887  @param tgtpre the prefix to the output names, including the full path
888  @param logger a logging.Logger to use for logging messages"""
889 
890  logger.info('stdout.anl: make this from stdout and fort.2*')
891  with open('stdout.anl','wb') as outf:
892  logger.info('stdout.anl: stdout')
893  with open('stdout','rb') as inf:
894  shutil.copyfileobj(inf,outf)
895  for infile in glob.iglob('fort.2*'):
896  infile=str(infile)
897  logger.info('stdout.anl: %s'%(infile,))
898  with open(infile,'rb') as inf:
899  shutil.copyfileobj(inf,outf)
900  stdoutanl=tgtpre+'.stdout.anl'
902  'stdout.anl',stdoutanl,keep=False,logger=logger)
903 
904  def _make_ensemble_spread(self,tgtpre,logger):
905  """Do not call this function directly; it is part of the
906  implementation of make_diag_files. It delivers ensemble
907  spread binary and ctl files to com directory."""
908 
909  logger.info('deliver ensemble spread files')
910  ctl=tgtpre+'.ens_spread.ctl'
911  grd=tgtpre+'.ens_spread.grd'
912  if isnonempty('ens_spread.ctl'):
914  'ens_spread.ctl',ctl,keep=False,logger=logger)
916  'ens_spread.grd',grd,keep=False,logger=logger)
917 
918  def _make_diag_for(self,tgtpre,dtype,logger):
919  """!Generates one diagnostic output file.
920 
921  Do not call this function directly. It is part of the
922  internal implementation of make_diag_files. Each worker
923  thread calls this function for one GSI data type in the
924  GSI_DATA_TYPES array, to create two zlib-compressed (gzipped)
925  copies of various diagnostic files for that data type. There
926  are two such files per datatype: one for the first guess
927  ("ges") and one for the analysis ("anl").
928  @param tgtpre the prefix to the output names, including the full path
929  @param logger a logging.Logger to use for logging messages
930  @param dtype the string name of this datatype"""
931  for loop in [ '01', '03' ]:
932  string=loop
933  if string=='01': string='ges'
934  if string=='03': string='anl'
935  theglob='pe*.%s_%s*'%(dtype,loop)
936  globbed=sorted(glob.glob(theglob))
937  if not globbed:
938  logger.warning('No %s'%(theglob,))
939  continue
940  tmpfile='diag_%s_%s.gz'%(dtype,string)
941  outfile=tgtpre+'.'+tmpfile
942  logger.info('%s: gzip compress %d %s'
943  %(tmpfile,len(globbed),theglob))
944 
945  blocksize=1048576*64
946  outf=None
947  try:
948  outf=gzip.open(tmpfile,'wb')
949  for infile in globbed:
950  logger.info('%s: gzip compress %s'%(tmpfile,infile))
951  with open(infile,'rb') as inf:
952  while True:
953  indata=inf.read(blocksize)
954  if indata is None or indata=='':
955  break # end of input file
956  outf.write(indata)
957  finally:
958  if outf is not None:
959  outf.close()
960  del outf
962  tmpfile,outfile,keep=False,logger=logger)
963 
964  def run(self):
965  """!Runs the GSI and delivers the results.
966 
967  Executes the GSI in a temporary scrub directory, deleting it
968  afterwards if self.scrub is False. Follows this overall
969  pattern:
970  1. Make temporary area and cd there
971  2. Copy inputs
972  3. Run the grab_more_inputs(), which subclasses should override
973  4. Calls make_gsi_namelist() to generate the namelist
974  5. Calls before_gsi() which subclasses should override
975  6. Calls run_gsi_exe() to run the actual GSI program
976  7. Calls after_gsi() which subclasses should override
977  8. Calls deliver_products() to copy files to COM
978  9. Generates diagnostic files.
979  10. Deletes the temporary directory if self.scrub=False"""
980  logger=self.log()
981  dirname=self.workdir
982  logger.info('Run gsi in directory %s'%(dirname,))
983  if os.path.exists(dirname):
984  logger.info('Delete old data in %s'%(dirname,))
985  shutil.rmtree(dirname)
986  with NamedDir(dirname,keep=not self.scrub):
987  self.grab_fix_parm()
988  self.grab_enkf_input()
989  if not self.confbool('singleobstest',False):
990  self.grab_bufr()
991  self.grab_prepbufr()
992  self.write_vitals()
993  else:
994  logger.info('This is a single obs test.')
995  self.grab_bias_satang()
996  self.grab_more_inputs()
997  self.copy_wrf_inout()
998  self.make_gsi_namelist()
999 
1000  self.before_gsi()
1001  self.run_gsi_exe()
1002  self.after_gsi()
1003 
1004  self.deliver_products()
1005 
1006  diagpre=self.confstr('diagpre','')
1007  diagthreads=self.confint('diagthreads',10)
1008  if diagpre is None or diagpre=='':
1009  logger.info('GSI diagnostic outputs are disabled.')
1010  else:
1011  self.make_diag_files(diagpre,diagthreads)
1012 
1013  self.state=COMPLETED
1014 
1015  def grab_fix_parm(self):
1016  """!Links or copies to the local directory any fix or parm
1017  files needed by GSI."""
1018  logger=self.log()
1019  logger.info("Copying fix and parm files")
1020  s=self.icstr
1021  def lnsf(a,b):
1022  produtil.fileop.make_symlink(a,b,logger=logger,force=True)
1023 
1024  if self.confbool('sat_radiance_da',True):
1025  if self.confbool('use_gfs_stratosphere',True):
1026  anavinfo=s('{FIXgsi}/anavinfo_hwrf_L75')
1027  elif (self.confbool('hwrf_43lev_conf',False,section='prelaunch') or self.confbool('hwrf_other_conf',False,section='prelaunch')):
1028  anavinfo=s('{FIXgsi}/anavinfo_hwrf_L42')
1029  else:
1030  anavinfo=s('{FIXgsi}/anavinfo_hwrf_L60')
1031  else:
1032  if (self.confbool('hwrf_43lev_conf',False,section='prelaunch') or self.confbool('hwrf_other_conf',False,section='prelaunch')):
1033  anavinfo=s('{FIXgsi}/anavinfo_hwrf_L42_nooz')
1034  else:
1035  anavinfo=s('{FIXgsi}/anavinfo_hwrf_L60_nooz')
1036  berror=s('{FIXgsi}/nam_glb_berror.f77.gcv')
1037  emiscoef_IRwater=s('{FIXcrtm}/EmisCoeff/IR_Water/Big_Endian/Nalli.IRwater.EmisCoeff.bin')
1038  emiscoef_IRice=s('{FIXcrtm}/EmisCoeff/IR_Ice/SEcategory/Big_Endian/NPOESS.IRice.EmisCoeff.bin')
1039  emiscoef_IRland=s('{FIXcrtm}/EmisCoeff/IR_Land/SEcategory/Big_Endian/NPOESS.IRland.EmisCoeff.bin')
1040  emiscoef_IRsnow=s('{FIXcrtm}/EmisCoeff/IR_Snow/SEcategory/Big_Endian/NPOESS.IRsnow.EmisCoeff.bin')
1041  emiscoef_VISice=s('{FIXcrtm}/EmisCoeff/VIS_Ice/SEcategory/Big_Endian/NPOESS.VISice.EmisCoeff.bin')
1042  emiscoef_VISland=s('{FIXcrtm}/EmisCoeff/VIS_Land/SEcategory/Big_Endian/NPOESS.VISland.EmisCoeff.bin')
1043  emiscoef_VISsnow=s('{FIXcrtm}/EmisCoeff/VIS_Snow/SEcategory/Big_Endian/NPOESS.VISsnow.EmisCoeff.bin')
1044  emiscoef_VISwater=s('{FIXcrtm}/EmisCoeff/VIS_Water/SEcategory/Big_Endian/NPOESS.VISwater.EmisCoeff.bin')
1045  emiscoef_MWwater=s('{FIXcrtm}/EmisCoeff/MW_Water/Big_Endian/FASTEM6.MWwater.EmisCoeff.bin')
1046  aercoef=s('{FIXcrtm}/AerosolCoeff/Big_Endian/AerosolCoeff.bin')
1047  cldcoef=s('{FIXcrtm}/CloudCoeff/Big_Endian/CloudCoeff.bin')
1048  satinfo=s('{FIXgsi}/hwrf_satinfo.txt')
1049  atmsfilter=s('{FIXgsi}/atms_beamwidth.txt')
1050  scaninfo=s('{FIXgsi}/global_scaninfo.txt')
1051  satangl=s('{FIXgsi}/nam_global_satangbias.txt')
1052  pcpinfo=s('{FIXgsi}/nam_global_pcpinfo.txt')
1053  ozinfo=s('{FIXgsi}/global_ozinfo.txt')
1054  errtable=s('{FIXgsi}/hwrf_nam_errtable.r3dv')
1055  convinfo=s('{FIXgsi}/hwrf_convinfo.txt')
1056 
1057  # Only need this file for single obs test
1058  bufrtable=s('{FIXgsi}/prepobs_prep.bufrtable')
1059 
1060  # Only need this file for sst retrieval
1061  bftab_sst=s('{FIXgsi}/bufrtab.012')
1062 
1063  # NOTE: GSI now run for all storm depths.
1064  lnsf(anavinfo, './anavinfo')
1065  lnsf(berror, './berror_stats')
1066  lnsf(emiscoef_IRwater, './Nalli.IRwater.EmisCoeff.bin')
1067  lnsf(emiscoef_IRice, './NPOESS.IRice.EmisCoeff.bin')
1068  lnsf(emiscoef_IRsnow, './NPOESS.IRsnow.EmisCoeff.bin')
1069  lnsf(emiscoef_IRland, './NPOESS.IRland.EmisCoeff.bin')
1070  lnsf(emiscoef_VISice, './NPOESS.VISice.EmisCoeff.bin')
1071  lnsf(emiscoef_VISland, './NPOESS.VISland.EmisCoeff.bin')
1072  lnsf(emiscoef_VISsnow, './NPOESS.VISsnow.EmisCoeff.bin')
1073  lnsf(emiscoef_VISwater, './NPOESS.VISwater.EmisCoeff.bin')
1074  lnsf(emiscoef_MWwater, './FASTEM6.MWwater.EmisCoeff.bin')
1075  lnsf(aercoef, './AerosolCoeff.bin')
1076  lnsf(cldcoef, './CloudCoeff.bin')
1077  #lnsf(satangl, './satbias_angle')
1078  lnsf(satinfo, './satinfo')
1079  lnsf(scaninfo, './scaninfo')
1080  lnsf(pcpinfo, './pcpinfo')
1081  lnsf(ozinfo, './ozinfo')
1082  lnsf(convinfo, './convinfo')
1083  lnsf(errtable, './errtable')
1084  lnsf(atmsfilter, './atms_beamwidth.txt')
1085  lnsf(bufrtable, './prepobs_prep.bufrtable')
1086  lnsf(bftab_sst, './bftab_sstphr')
1087 
1088  # Copy CRTM coefficient files based on entries in satinfo file
1089  with open('satinfo','rt') as f:
1090  for line in f:
1091  splat=line.split()
1092  if not splat or splat[0][0]=='!': continue
1093  satsen=splat[0]
1094  spccoeff=s('{satsen}.SpcCoeff.bin',satsen=satsen)
1095  if not isnonempty(spccoeff):
1096  make_symlinks_in([
1097  s('{FIXcrtm}/SpcCoeff/Big_Endian/{spccoeff}',
1098  spccoeff=spccoeff),
1099  s('{FIXcrtm}/TauCoeff/Big_Endian/{satsen}.TauCoeff.bin',
1100  satsen=satsen) ],
1101  '.',force=True,logger=logger)
1102 
1104  """!Runs the GSI based on the HWRF FGAT scheme."""
1105  def __init__(self,dstore,conf,section,domain,wrf_in_prod,fgat_in_prods,
1106  sim,cycling_interval=None,taskname=None,atime=None,
1107  enkf_domains=None,ensda=None,**kwargs):
1108  """!The FGATGSI constructor:
1109  @param dstore passed to Datum: the Datastore object for this Task
1110  @param conf the conf object for this task (passed to HWRFTask)
1111  @param section the conf section for this task (passed to HWRFTask)
1112  @param domain the WRFDomain for this GSI. Must have been
1113  initialized by a WRFSimulation
1114  @param wrf_in_prod the Product for the wrfinput_d01 or
1115  ghost_d0* file for that domain
1116  @param fgat_in_prods a mapping from analysis time to a product
1117  for all of the FGAT times
1118  @param sim the hwrf.wrf.WRFSimulation that will be run as the forecast
1119  @param cycling_interval Optional: time between HWRF forecast
1120  cycles in seconds
1121  @param taskname Optional: the taskname for this product in the datastore
1122  @param atime the analysis time as a datetime.datetime.
1123  Default: conf.cycle
1124  @param enkf_domains a list of WRF domains that should be copied
1125  from the hwrf.ensda.EnsembleDA.
1126  @param ensda a subclass of hwrf.ensda.DAEnsemble that provides
1127  regional ensemble forecasts to generate the forecast error
1128  covariance
1129  @param kwargs ignored; passed to HWRFTask"""
1130  assert(wrf_in_prod is not None)
1131  if cycling_interval is None: cycling_interval=6*3600
1132  self.cycling_interval=to_timedelta(cycling_interval)
1133  if atime is None: atime=conf.cycle
1134  for prod in fgat_in_prods.itervalues():
1135  assert(isinstance(prod,produtil.datastore.Product))
1136  self._fgat_in_prod = [ (prod,to_datetime_rel(time,atime)) \
1137  for time,prod in fgat_in_prods.iteritems() ]
1138  assert(wrf_in_prod is not None)
1139  super(FGATGSI,self).__init__(dstore,conf,section,domain,wrf_in_prod,
1140  sim,taskname=taskname,atime=atime,enkf_domains=enkf_domains,
1141  ensda=ensda,**kwargs)
1142  ##@var cycling_interval
1143  # the time between forecast cycles, a datetime.timedelta
1144 
1145  def parent_fhrs(self):
1146  """!Iterates over FGAT forecast hours, relative to the parent
1147  analysis time.
1148 
1149  Iterates over all FGAT forecast hours (as ints) relative to
1150  the parent analysis time. For example, 3, 6, and 9 for
1151  three-hourly FGAT off of GDAS."""
1152  parent_atime=self.parent_atime
1153  for p,t in self._fgat_in_prod:
1154  yield int(round(to_fraction(t-parent_atime)/3600))
1155 
1156  def fgat_fhrs(self):
1157  """!Iterates over FGAT forecast hours relative to this model.
1158 
1159  Iterates over all FGAT forecast hours (as ints) relative to
1160  this model's analysis time. For example, -3, 0, and 3 for
1161  three-hourly FGAT off of GDAS."""
1162  atime=self.atime
1163  for p,t in self._fgat_in_prod:
1164  yield int(round(to_fraction(t-atime,negok=True)/3600))
1165 
1166  def grab_more_inputs(self):
1167  """!Links to the current working directory gdas native spectral
1168  output files for all FGAT hours."""
1169  atime=self.parent_atime
1170  logger=self.log()
1171  for hr in self.parent_fhrs():
1172  ftime = atime + datetime.timedelta(hours=hr)
1173  there=self._in_catalog.locate(
1174  self._sigma_dataset,self._sigma_item,
1175  atime=atime,ftime=ftime,logger=logger)
1176  if not isnonempty(there):
1177  raise GSIInputError(
1178  '%s %s: required input file is empty or non-existent: %s'
1179  %(self._sigma_dataset,self._sigma_item,there))
1180  here='gfs_sigf%02d'%(hr,)
1181  make_symlink(there,here,force=True,logger=logger)
1182 
1183  def copy_wrf_inout(self,filename='wrf_inout',others='wrf_inou%d'):
1184  """!Copy WRF analysis or input files to this directory.
1185 
1186  Copies the WRF analysis or input file to the specified
1187  filename. Also copies the wrf input files for other FGAT
1188  hours to separate files specified by "others". The "others"
1189  must be a format that includes at least one integer argument.
1190  @param filename ignored
1191  @param others a string format with a %d in it, used to
1192  generate output filenames"""
1193  logger=self.log()
1194  atime=self.atime
1195  names=dict()
1196  fci=to_fraction(self.cycling_interval)
1197  for p,t in self._fgat_in_prod:
1198  assert(isinstance(p,produtil.datastore.Product))
1199  dt=int(round(float((to_fraction(t-atime,negok=True) + fci)/
1200  3600.0)))
1201  name=others%dt
1202  logger.debug('add prod=%s time=%s'%(repr(p.did),repr(dt)))
1203  names[p]=name
1204 
1205  names[self._wrf_in_prod]='wrf_inout'
1206 
1207  for p,n in names.iteritems():
1208  logger.debug('prod %s has name %s'%(repr(p.did),repr(n)))
1209 
1210  def namer(p,logger,*args): return names[p]
1211  def actor(p,name,logger,*args):
1212  deliver_file(p.location,name,logger=logger)
1213  produtil.datastore.wait_for_products([a for a in names.iterkeys() ],
1214  logger,namer,actor)
1215 
1216 
1217 ###############################################################################
1218 
1219 def unset_gsistatus(conf,logger=None):
1220  """!Delete the gsi status file.
1221 
1222  Deletes all GSI status files, whose paths are determined from the
1223  given config object. If the logger is not specified, the
1224  gsistatus subdomain of the conf default logging domain is used.
1225  @param conf the hwrf.config.HWRFConfig object
1226  @param logger Optional: a logging.Logger for logging."""
1227  if logger is None: logger=conf.log('gsistatus')
1228  for onetwo in ( 'gsistatus', 'gsistatus2' ):
1229  gsistat=conf.get('dir',onetwo)
1230  gsistatfile=os.path.join(conf.getdir('com'),gsistat)
1231  produtil.fileop.remove_file(gsistatfile,info=True,logger=logger)
1232 
1233 def set_gsistatus(conf,logger=None):
1234  """!Sets the GSI status files.
1235 
1236  Set run_gsi_d02=YES (true) or =NO (false) depending on the
1237  configuration. If the logger is not specified, the gsistatus
1238  subdomain of the conf default logging domain is used.
1239  @param conf the hwrf.config.HWRFConfig object
1240  @param logger Optional: the logging.Logger for log messages."""
1241  if logger is None: logger=conf.log('set gsi status')
1242 
1243  tdrflagfile=conf.strinterp('dir','{com}/{stormlabel}.tdr')
1244  run_gsi=conf.getbool('config','run_gsi')
1245  conditional_gsid03=conf.getbool('config','conditional_gsid03',False)
1246  conditional_gsid02=conf.getbool('config','conditional_gsid02',False)
1247 
1248  for onetwo in ( 'gsistatus', 'gsistatus2' ):
1249  gsistat=conf.get('dir',onetwo)
1250  gsistatfile=os.path.join(conf.getdir('com'),gsistat)
1251 
1252  gsid02flag='YES' if run_gsi else 'NO'
1253  gsid03flag='YES' if run_gsi else 'NO'
1254  if gsid03flag=='YES' and conditional_gsid03 and \
1255  not isnonempty(tdrflagfile):
1256  logger.info('GSI is disabled for d03 because flag file is '
1257  'empty or does not exist: %s'%(tdrflagfile,))
1258  gsid03flag='NO'
1259  if gsid02flag=='YES' and conditional_gsid02 and \
1260  not isnonempty(tdrflagfile):
1261  logger.info('GSI is disabled for d02 because flag file is '
1262  'empty or does not exist: %s'%(tdrflagfile,))
1263  gsid02flag='NO'
1264 
1265  logger.info('Setting run_gsi_d02=%s in gsi status file %s'%(
1266  gsid02flag,gsistatfile))
1267  logger.info('Setting run_gsi_d03=%s in gsi status file %s'%(
1268  gsid03flag,gsistatfile))
1269  with open(gsistatfile,'wt') as f:
1270  f.write('run_gsi_d02=%s\nrun_gsi_d03=%s\n'%(
1271  gsid02flag,gsid03flag))
1272 
1273 def get_gsistatus(conf,domain,logger=None):
1274  """!Checks the gsi status for a specific domain.
1275 
1276  Checks the first GSI status file, scanning for information about
1277  the specified domain. If the file does not exist or cannot be
1278  opened or read, then False is returned. Otherwise, the file is
1279  scanned for run_gsi_d02=YES/NO or run_gsi_d03=YES/NO (case
1280  insensitive). The last of those run_gsi_d02/d03 lines is used:
1281  NO=return False, YES=return True.
1282  @param conf the hwrf.config.HWRFConfig object with configuration info
1283  @param domain either "gsi_d02" or "gsi_d03", the domain of interest
1284  @param logger Optional: the logging.Logger for log messages"""
1285  if domain!='gsi_d02' and domain!='gsi_d03':
1286  raise ValueError('In get_gsistatus, domain must be gsi_d02 '
1287  'or gsi_d03.')
1288  if logger is None: logger=conf.log('get gsi status')
1289 
1290  gsistat=conf.get('config','gsistatus')
1291  gsistatfile=os.path.join(conf.getdir('com'),gsistat)
1292  gsi_success=None
1293 
1294  logger.info('%s: scan gsi status file for run_%s=YES or NO'%(
1295  gsistatfile,domain))
1296 
1297  try:
1298  with open(gsistatfile,'rt') as f:
1299  for line in f:
1300  if line.find('run_%s=YES'%(domain))>=0:
1301  gsi_success=True
1302  logger.info(
1303  'gsi status file says: run_%s=YES'%(domain))
1304  elif line.find('run_%s=NO'%(domain))>=0:
1305  gsi_success=False
1306  logger.warning(
1307  'gsi status file says: run_%s=NO'%(domain))
1308  except EnvironmentError as e:
1309  logger.error(
1310  'Error checking gsi status file: %s'
1311  %(str(e),),exc_info=True)
1312  except Exception as ee:
1313  logger.error(
1314  'Unhandled exception while checking gsi status file: %s'
1315  %(str(ee),),exc_info=True)
1316  raise
1317 
1318  if gsi_success is None:
1319  logger.warning('Could not scan gsi status file for run_%s=YES'
1320  'or NO. Assuming run_%s=NO.'%(domain,domain))
1321  gsi_success=False
1322  elif gsi_success:
1323  logger.info(
1324  'run_%s=YES: gsi status file says gsi init succeeded.'%(domain))
1325  else:
1326  logger.warning(
1327  'run_%s=NO: gsi status file says gsi init failed or was aborted.'
1328  %(domain))
1329 
1330  return gsi_success
1331 
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
Definition: fileop.py:359
Create namelists, monitor wrf simulations, generate filenames.
Definition: wrf.py:1
def netcdfver(filename)
What is the NetCDF version of this file?
Definition: fileop.py:177
def wrfout_copier(self, file)
Generate the wrfout file converter.
Definition: gsi.py:481
cycling_interval
the time between forecast cycles, a datetime.timedelta
Definition: gsi.py:1132
def make_diag_files(self, tgtpre, nthreads)
Creates GSI diagnostic files.
Definition: gsi.py:849
def set_ensda(self, ensda, enkf_domains)
Sets the hwrf.ensda.DAEnsemble to use, and the enkf domains.
Definition: gsi.py:179
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
def _make_ensemble_spread(self, tgtpre, logger)
Definition: gsi.py:904
def grab_bias_satang(self)
Copies or links bias correction and satellite angle files.
Definition: gsi.py:777
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
A subclass of Product that represents file delivery.
Definition: datastore.py:856
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
def wait_for_products
Waits for products to be available and performs an action on them.
Definition: datastore.py:979
def grab_more_inputs(self)
Links to the current working directory gdas native spectral output files for all FGAT hours...
Definition: gsi.py:1166
def remove_file
Deletes the specified file.
Definition: fileop.py:251
def grab_gfs_enkf(self, atime=None, kwargs)
Links the GFS ENKF files to this directory.
Definition: gsi.py:580
conf
This HWRFTask's hwrf.config.HWRFConfig object.
Definition: hwrftask.py:415
def domain(self)
The WRF domain for which GSI is being run.
Definition: gsi.py:672
def wrf_top_Pa(self)
The WRF model top in pascals.
Definition: gsi.py:160
def atime(self)
The analysis time of this GSI.
Definition: gsi.py:166
def grab_obstype_section
Copies or links observations.
Definition: gsi.py:303
Runs the GSI based on the HWRF FGAT scheme.
Definition: gsi.py:1103
def unset_gsistatus
Delete the gsi status file.
Definition: gsi.py:1219
global_ensemble_size
The number of members in the global (parent model) ensemble.
Definition: gsi.py:98
def confbool
Alias for self.conf.getbool for section self.section.
Definition: hwrftask.py:287
def get_gsistatus
Checks the gsi status for a specific domain.
Definition: gsi.py:1273
section
The confsection in self.section for this HWRFTask (read-only)
Definition: hwrftask.py:422
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
def make_gsi_namelist
Creates the GSI namelist in the specified file.
Definition: gsi.py:688
def run_gsi_exe(self)
Runs the actual GSI executable.
Definition: gsi.py:796
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
Definition: hwrftask.py:396
A piece of data produced by a Task.
Definition: datastore.py:716
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
Contains the WorkPool class, which maintains pools of threads that perform small tasks.
Definition: workpool.py:1
def get
Alias for self.meta() Returns the value of the specified metadata key or returns default if it is uns...
Definition: datastore.py:637
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Definition: fileop.py:333
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
def get_wrfanl(self, domain)
Obtain output ghost product for the specified domain.
Definition: gsi.py:651
used_regional_ensemble
Was the regional ensemble used for forecast error covariances?
Definition: gsi.py:97
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
def makedirs
Make a directory tree, working around filesystem bugs.
Definition: fileop.py:224
Time manipulation and other numerical routines.
Definition: numerics.py:1
This module allows querying resource usage and limits, as well as setting resource limits...
Definition: rusage.py:1
def parent_atime(self)
Parent model analysis time.
Definition: gsi.py:171
def get_wrfinput(self)
Obtain output wrfinput data for the outermost WRF domain.
Definition: gsi.py:661
def grab_prepbufr(self, atime=None, kwargs)
Links or copies the prepbufr file to the local directory.
Definition: gsi.py:437
def inputiter(self)
Iterate over needed inputs.
Definition: gsi.py:194
def grab_more_inputs(self)
Called by run() to obtain additional inputs before before_gsi()
Definition: gsi.py:770
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def products
Iterates over all output products of this Task.
Definition: gsi.py:676
def confint
Alias for self.conf.getint for section self.section.
Definition: hwrftask.py:248
def __init__(self, dstore, conf, section, domain, wrf_in_prod, sim, taskname=None, atime=None, parent_atime=None, enkf_domains=None, ensda=None, kwargs)
The GSIBase constructor:
Definition: gsi.py:60
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
Definition: namelist.py:1
def conffloat
Alias for self.conf.getfloat for section self.section.
Definition: hwrftask.py:274
def before_gsi(self)
Called by run() just before running the gsi program.
Definition: gsi.py:763
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
Definition: hwrftask.py:195
A pool of threads that perform some list of tasks.
Definition: workpool.py:84
def log
Obtain a logging domain.
Definition: hwrftask.py:425
Insert config file data into a Fortran namelist file.
Definition: namelist.py:154
def after_gsi(self)
Called by run() after the gsi executable completes.
Definition: gsi.py:756
def conftimestrinterp(self, string, ftime, atime=None, section=None, kwargs)
Alias for self.timestr for backward comaptibility.
Definition: hwrftask.py:328
Base class of anything that runs the GSI.
Definition: gsi.py:46
def copy_wrf_inout
Copies the WRF analysis or input file to the specified filename.
Definition: gsi.py:630
def _make_stdout_anl(self, tgtpre, logger)
Concatenates many "stdout" files into one.
Definition: gsi.py:881
def write_vitals
Writes the tcvitals (from self.storminfo) to the specified file.
Definition: gsi.py:471
def fgat_fhrs(self)
Iterates over FGAT forecast hours relative to this model.
Definition: gsi.py:1156
def _make_diag_for(self, tgtpre, dtype, logger)
Generates one diagnostic output file.
Definition: gsi.py:918
Provides the location of a file in an archive, on disk or on a remote server via sftp or ftp...
Definition: input.py:109
def deliver_products(self)
Delivers output products.
Definition: gsi.py:834
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def run(self)
Runs the GSI and delivers the results.
Definition: gsi.py:964
def __init__(self, dstore, conf, section, domain, wrf_in_prod, fgat_in_prods, sim, cycling_interval=None, taskname=None, atime=None, enkf_domains=None, ensda=None, kwargs)
The FGATGSI constructor:
Definition: gsi.py:1107
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
Definition: datastore.py:1084
def grab_wrf_enkf(self, ensda)
Links the WRF ENKF files to this directory.
Definition: gsi.py:508
def set_gsistatus
Sets the GSI status files.
Definition: gsi.py:1233
def get_ghost(self, domain)
Obtain output ghost product for the specified domain.
Definition: gsi.py:641
def find_exe
Searches the $PATH or a specified iterable of directory names to find an executable file with the giv...
Definition: fileop.py:573
Raised when GSI cannot find a required input file.
Definition: exceptions.py:147
def parent_fhrs(self)
Iterates over FGAT forecast hours, relative to the parent analysis time.
Definition: gsi.py:1145
def taskvars(self)
The dict of object-local values used for string substitution.
Definition: hwrftask.py:243
def realtime(self)
Is this job a real-time forecast job?
Definition: hwrftask.py:180
def icstr(self, string, section=None, kwargs)
Expands a string in the given conf section.
Definition: hwrftask.py:351
def grab_enkf_input(self)
Link or copy ensemble inputs.
Definition: gsi.py:268
hybrid_da
Was hybrid 3DVAR-ENKF data assimilation used?
Definition: gsi.py:99
def copy_wrf_inout
Copy WRF analysis or input files to this directory.
Definition: gsi.py:1183
def grab_bufr
Link bufr files.
Definition: gsi.py:286
def grab_fix_parm(self)
Links or copies to the local directory any fix or parm files needed by GSI.
Definition: gsi.py:1015
def make_symlink
Creates a symbolic link "target" that points to "source".
Definition: fileop.py:677