1 """!Runs the GSI data assimilation on the HWRF system."""
5 __all__ = [
'GSI_DATA_TYPES',
'GSIBase',
'FGATGSI' ]
7 import os, shutil, glob, gzip
19 from produtil.run import mpirun, mpi, openmp, checkrun, bigexe, runsync
20 from hwrf.numerics import to_datetime,to_datetime_rel, to_fraction, \
27 "hirs2_n14",
"msu_n14",
"sndr_g08",
"sndr_g11",
"sndr_g12",
"sndr_g13",
28 "sndr_g08_prep",
"sndr_g11_prep",
"sndr_g12_prep",
"sndr_g13_prep",
29 "sndrd1_g11",
"sndrd2_g11",
"sndrd3_g11",
"sndrd4_g11",
"sndrd1_g12",
30 "sndrd2_g12",
"sndrd3_g12",
"sndrd4_g12",
"sndrd1_g13",
"sndrd2_g13",
31 "sndrd3_g13",
"sndrd4_g13",
"sndrd1_g14",
"sndrd2_g14",
"sndrd3_g14",
32 "sndrd4_g14",
"sndrd1_g15",
"sndrd2_g15",
"sndrd3_g15",
"sndrd4_g15",
33 "hirs3_n15",
"hirs3_n16",
"hirs3_n17",
"amsua_n15",
"amsua_n16",
34 "amsua_n17",
"amsub_n15",
"amsub_n16",
"amsub_n17",
"hsb_aqua",
35 "airs_aqua",
"amsua_aqua",
"imgr_g08",
"imgr_g11",
"imgr_g12",
36 "imgr_g14",
"imgr_g15",
"pcp_ssmi_dmsp",
"pcp_tmi_trmm",
"conv",
37 "sbuv2_n16",
"sbuv2_n17",
"sbuv2_n18",
"sbuv2_n19",
"gome_metop-a",
38 "omi_aura",
"mls_aura",
"ssmi_f13",
"ssmi_f14",
"ssmi_f15",
39 "hirs4_n18",
"hirs4_metop-a",
"amsua_n18",
"amsua_metop-a",
"mhs_n18",
40 "mhs_metop-a",
"amsre_low_aqua",
"amsre_mid_aqua",
"amsre_hig_aqua",
41 "ssmis_f16",
"ssmis_f17",
"ssmis_f18",
"ssmis_f19",
"ssmis_f20",
42 "iasi_metop-a",
"hirs4_n19",
"amsua_n19",
"mhs_n19",
"seviri_m08",
43 "seviri_m09",
"seviri_m10",
"cris_npp",
"atms_npp",
"hirs4_metop-b",
44 "amsua_metop-b",
"mhs_metop-b",
"iasi_metop-b",
"gome_metop-b" ]
47 """!Base class of anything that runs the GSI. Do not use directly."""
58 def __init__(self,dstore,conf,section,domain,wrf_in_prod,sim,
59 taskname=
None,atime=
None,parent_atime=
None,
60 enkf_domains=
None,ensda=
None,**kwargs):
61 """!The GSIBase constructor:
62 @param dstore passed to Datum: the Datastore object for this Task
63 @param conf the conf object for this task (passed to HWRFTask)
64 @param section the conf section for this task (passed to HWRFTask)
65 @param domain the WRFDomain for this GSI. Must have been
66 initialized by a WRFSimulation
67 @param wrf_in_prod the Product for the wrfinput_d01 or
68 ghost_d0* file for that domain
69 @param sim the hwrf.wrf.WRFSimulation that will be run as the forecast
70 @param taskname Optional: the taskname for this product in the datastore
71 @param atime the analysis time as a datetime.datetime.
73 @param parent_atime the analysis time of the parent model. This is
74 relevant if the parent model's forecast is used as the
76 @param enkf_domains a list of WRF domains that should be copied
77 from the hwrf.ensda.EnsembleDA.
78 @param ensda a subclass of hwrf.ensda.DAEnsemble that provides
79 regional ensemble forecasts to generate the forecast error
81 @param kwargs ignored; passed to HWRFTask"""
82 super(GSIBase,self).
__init__(dstore,conf,section,taskname=taskname,
86 if atime
is None: atime=conf.cycle
87 if parent_atime
is None:
88 parent_atime=to_datetime_rel(-6*3600,atime)
89 self.
_atime=to_datetime(atime)
95 self.
_tdrflagfile=os.path.join(self.conf.getdir(
'com'), \
96 self.
icstr(
'{stormlabel}.tdr'))
100 if enkf_domains
is not None:
101 for domain
in enkf_domains:
102 self._enkf_domains.append(domain)
103 assert(wrf_in_prod
is not None)
104 assert(wrf_in_prod.location
is not None)
105 assert(wrf_in_prod.location!=
'')
108 with dstore.transaction()
as t:
109 prodname=
'gsi_out_'+self.domain.name
111 location=os.path.join(self.
outdir,prodname))
115 def getc(what,default):
117 return str(kwargs[what])
119 if s
is not None and s!=
'':
140 if 'in_catalog' in kwargs:
141 ink=kwargs[
'in_catalog']
144 elif isinstance(ink,basestring):
146 elif ink
is None:
pass
149 'In hwrf.gsi.GSIBase.__init__, in_catalog must be None, '
150 'a basestring or a DataCatalog. You provided an object '
151 'of type %s, with value %s.'
152 %(type(ink).__name__,repr(ink)))
154 if incat_name
is None:
155 incat_name=self.
confstr(
'catalog')
161 """!The WRF model top in pascals"""
162 p_top_requested=self._sim.nl.nl_get(
'domains',
'p_top_requested')
163 return float(p_top_requested)
167 """!The analysis time of this GSI."""
172 """!Parent model analysis time.
174 The analysis time of the parent data assimilation models
175 (ie.: GDAS, GFS ENKF). This is usually six hours before
180 """!Sets the hwrf.ensda.DAEnsemble to use, and the enkf domains.
182 Specifies the ensemble to use for forecast error covariances,
183 and the list of hwrf.wrf.WRFDomain domains that should be
185 @param ensda the hwrf.ensda.DAEnsemble that provides short
186 simulation output, for the forecast error covariances
187 @param enkf_domains an iterable of enkf domains, WRFDomain
188 objects, identifying which domains in ensda should be used."""
191 for dom
in enkf_domains:
192 self._enkf_domains.append(dom)
195 """!Iterate over needed inputs.
197 Iterates over all files external to this workflow that are
198 required to run the GSI. This may include the GFS ENKF,
199 prepbufr, bufr, and other files. This is used by the
200 hwrf.input module to obtain those inputs."""
202 maxmemb=self.
confint(
'num_enkf',80)
205 enkf_age=self.
conffloat(
'enkf_age_hr',6.0)
207 atime=to_datetime_rel(-enkf_age*3600.0,my_atime)
208 ftime=to_datetime_rel(3600*fhour,atime)
209 for zmemb
in xrange(maxmemb):
212 item=self.
_enkf_item,atime=atime,enkfmem=imemb,
217 if self.
confbool(
'use_newradbc',
False):
242 for section
in olist.split(
','):
244 if len(trim)<=0
or trim
in touched:
continue
245 dataset=self.conf.get(section,
'dataset')
246 item=self.conf.get(section,
'item')
247 otype=self.conf.get(section,
'type').lower()
248 if otype==
'satellite' and not self.
confbool(
'sat_radiance_da',
True):
250 if otype==
'satwnd' and not self.
confbool(
'sat_wnd_da',
True):
252 for (localname,obstype)
in self.conf.items(section):
253 if localname
in [
'dataset',
'item',
'type']:
256 yield dict(self.
taskvars,dataset=dataset,item=item,
257 obstype=obstype,atime=atime,optional=
True)
263 for hr
in self.parent_fhrs():
264 ftime=atime+datetime.timedelta(hours=hr)
269 """!Link or copy ensemble inputs.
271 Links or copies the output of the last ENKF or hwrf.ensda
272 cycle. Calls grab_wrf_enkf() if any enkf_domains were given
273 to the constructor. Otherwise, calls grab_gfs_enkf() to get
274 the global enkf. Will also revert to grab_gfs_enkf() if the
275 ensda should be used, but was unavailable."""
277 use_hwrf_ensemble=self.
confbool(
'use_hwrf_ensemble',
False)
278 ensda_when=self.
confstr(
'ensda_when',
'tdr_next_cycle').lower()
281 or ensda_when==
'always'):
289 Links or copies all needed bufr files to the local directory.
290 If sat_da is False, satellite obs will be omitted.
291 @param atime the analysis time to use when specifying the
293 @param morevars additional variables to pass for string
294 replacement when expanding bufr filenames in
295 configuration (hwrf.config.HWRFConfig) sections."""
298 for osection
in olist.split(
','):
299 trim=osection.strip()
300 if len(trim)>0
and not trim
in touched:
304 """!Copies or links observations.
306 @param section the obstype section to read
307 @param atime the atime for string expansion when finding bufr files
308 @param morevars more variables for string expansion when
311 Copies or links observations specified in the obstype sections
312 of the configuration file to the current working directory.
314 The section listed in self.section should contain an "obstype"
315 option, whose value is a comma separated list of section
316 names. This method reads every section in that list. For example,
320 catalog = {input_catalog}
322 obstypes = hdob_obstype,sat_obstypes,tdr_new_obstype
339 For each section, the option keys are the local directory
340 filenames expected by GSI, while the values are the data type
341 part of the operational filename (ie.: the satwind in
342 gfs.t12z.tm00.satwind.bufr_d). There are a few special keys:
344 dataset - the name of the dataset for hwrf.input purposes
345 item - the name of the item for hwrf.input purposes
346 type - the type of observation: satellite, or anything else.
347 At present, only "satellite" has any special meaning.
349 If the type is "satellite" then the entire section will be
350 skipped if sat_radiance_da=False in this task's config section.
352 If the type is "satwnd" then the entire section will be
353 skipped if sat_wnd_da=False in this task's config section.
355 Once the section is parsed, the files are all linked to this
358 if not isinstance(section,basestring): section=str(section)
363 atime=to_datetime_rel(atime,self.
atime)
365 dataset=self.conf.get(section,
'dataset')
366 item=self.conf.get(section,
'item')
367 otype=self.conf.get(section,
'type').lower()
369 logger.warning(
'process obs section %s with dataset=%s item=%s '
370 'type=%s'%(section,dataset,item,otype))
372 if otype==
'satellite':
373 if self.
confbool(
'sat_radiance_da',
True):
374 logger.warning(
'%s: satellite radiance DA '
375 'is enabled. Continuing...'%(section,))
377 logger.warning(
'%s: satellite radiance DA is disabled in hwrf.conf. '
378 'Not linking bufr files from this section.'
383 if self.
confbool(
'sat_wnd_da',
True):
384 logger.warning(
'%s: assimilation of satellite wind '
385 'is enabled. Continuing...'%(section,))
387 logger.warning(
'%s: assimilation of satellite wind is disabled in hwrf.conf. '
388 'Not linking bufr files from this section.'
393 items=self.conf.items(section)
394 otdict=dict( [ (v,k)
for k,v
in items ] )
395 namer=
lambda f,t: otdict[t]
397 for localname,obstype
in items:
398 if localname
in [
'dataset',
'item',
'type']:
continue
399 obstypes.append(obstype)
401 for obstype
in obstypes:
402 logger.warning(
'Find obstype=%s in dataset=%s item=%s'
403 %(obstype,dataset,item))
404 if not isinstance(obstype,basestring):
406 'In gsi.GSIBase.link_bufr, the obstypes parameter must '
407 'be an iterable container of basestrings. One of the '
408 'elements was a %s (value %s) instead.'
409 %(type(obstype).__name__,repr(obstype)))
410 if dataset ==
'tdr' and bool(self.
realtime):
411 ds=os.path.join(self.
getdir(
'intercom'),
'bufrprep')
412 it=self._in_catalog.parse(item,atime=atime,
413 logger=logger,obstype=
'tldplr')
414 there=os.path.join(ds,it)
416 there=self._in_catalog.locate(dataset,item,atime=atime,
417 logger=logger,obstype=obstype)
418 if there
is None or there==
'':
419 msg=
'%s: Could not find a location for this obstype.'\
422 if self.
confbool(
'require_all_bufrs',
False):
425 bn=os.path.basename(there)
428 make_symlink(there,on,logger=logger,force=
True)
430 msg=
'%s: Observation file is empty or non-existent: %s'\
433 if self.
confbool(
'require_all_bufrs',
False):
438 """!Links or copies the prepbufr file to the local directory.
440 @param atime the analysis time, or time relative to
441 self.atime. Used for string expansion in the
442 hwrf.config.HWRFConfig.
443 @param kwargs also passed to the hwrf.config.HWRFConfig
444 for string expansion"""
448 atime=to_datetime_rel(atime,self.
atime)
451 atime=atime,logger=logger,**kwargs)
452 if self.conf.getint(
'bufrprep',
'prepbufrprep',0) == 0:
454 atime=atime,logger=logger,**kwargs)
456 ds=os.path.join(self.
getdir(
'intercom'),
'bufrprep')
458 atime=atime,logger=logger,**kwargs)
459 there=os.path.join(ds,it)
460 if there
is None or there==
'':
461 msg=
'Could not find the prepbufr file (item=%s dataset=%s)' \
466 msg=there+
': is non-existent or empty'
469 make_symlink(there,
'prepbufr',logger=logger,force=
True)
472 """!Writes the tcvitals (from self.storminfo) to the specified
474 @param filename the file to receive the tcvitals"""
476 logger.info(
'Writing tcvitals to %s'%(repr(filename),))
477 with open(filename,
'wt')
as f:
478 f.write(self.storminfo.as_tcvitals()+
"\n")
479 assert(os.path.exists(filename))
482 """!Generate the wrfout file converter.
484 Returns the "copier" argument to deliver_file to use to copy
485 the specified file. Will be None, unless the file is HDF5, in
486 which case it will be "ncks -6 source target" to decompress
487 and convert to NetCDF3 style (uncompressed) with 64-bit
488 indexing. If ncks is missing, such a conversion is
489 impossible, so None is returned.
491 @param file the file that is to be copied"""
493 ncks=self.
getexe(
'ncks',
'')
495 logger.warning(
'ncks path not specified in conf [exe] '
496 'so I will search your $PATH')
501 if self.
confbool(
'sync_frequently',
True):
503 checkrun(bigexe(ncks)[
'-6',s,t]<
'/dev/null',logger=logger)
509 """!Links the WRF ENKF files to this directory.
510 @param ensda the hwrf.ensda.DAEnsemble that provides the files"""
512 logger.info(
'in grab_wrf_enkf')
516 for (ens,member)
in ensda.members_at_time(self.
atime):
517 logger.info(
'ens,member = %s,%s'%(repr(ens),repr(member)))
520 logger.info(
'ens,member,domain = %s,%s,%s'
521 %(repr(ens),repr(member),repr(domain)))
522 did=int(domain.get_grid_id())
523 if self.conf.getbool(
'config',
'run_ens_relocation'):
527 prod=member.rstage3.get_wrfout(domain=domain)
532 prod=member.get_wrfanl(domain=domain,atime=self.
atime)
534 logger.info(
'No product for domain %s.'%(str(domain),))
536 logger.info(
'domain %s prod %s'%(str(domain),prod.did))
537 nameme[prod]=
'wrf_en%03d'%iens
539 assert(did
is not None)
540 def renamer(p,l):
return nameme[p]
541 def copy(fromfile,tofile,copier):
542 deliver_file(fromfile,tofile,logger=logger,keep=
True,copier=copier)
543 def link(fromfile,tofile):
550 l.info(
'Link %s to %s'%(fromfile,n))
552 l.info(
'%s: file is HDF5. I will assume your GSI was built with support for compressed NetCDF4'%(fromfile,))
555 l.info(
'%s: file is NetCDF3.'%(fromfile,))
559 workpool.add_work(link,[fromfile,n])
561 workpool.add_work(copy,[fromfile,n,copier])
563 maxtime=self.
confint(
'maxwait',240)
569 plist,logger,renamer,actor,maxtime=maxtime)
574 logger.critical(
'Found only %d of %d WRF ENKF files - '
575 'using GFS ENKF instead.'%(count,wanted))
577 self.
postmsg(
'Found all %d of %d WRF ENKF files.'%(count,wanted))
581 """!Links the GFS ENKF files to this directory
582 @param atime the analysis time, or time relative to
583 self.atime. Used for string expansion in the
584 hwrf.config.HWRFConfig.
585 @param kwargs also passed to the hwrf.config.HWRFConfig
586 for string expansion"""
588 maxmemb=self.
confint(
'num_enkf',80)
590 logger.warning(
'Number of ENKF members requested is 0. Will '
591 'not link anything.')
595 enkf_age=self.
conffloat(
'enkf_age_hr',6.0)
597 assert(my_atime
is not None)
598 atime=to_datetime_rel(-enkf_age*3600.0,my_atime)
599 ftime=to_datetime_rel(3600*fhour,atime)
600 with open(
'filelist06',
'wt')
as fl:
602 for zmemb
in xrange(maxmemb):
606 enkfmem=imemb,ftime=ftime,**kwargs)
610 'is empty or non-existent: %s'
613 logger.warning(there+
': is empty or non-existent')
615 localmemb=localmemb+1
617 'sfg_{aYMDH}_fhr{fahr:02d}s_mem{imemb:03d}',
618 atime=atime,ftime=ftime,imemb=localmemb)
624 logger.info(
'Ensemble member is less than 40!!!'
625 'Will run 3DVAR instead of hybrid analysis')
631 """!Copies the WRF analysis or input file to the specified
633 @param filename the file to receive the data"""
635 def namer(p,logger,*args):
return filename
636 def actor(p,name,logger,*args):
637 deliver_file(p.location,name,logger=logger)
642 """!Obtain output ghost product for the specified domain.
644 If this GSI is being run on the specified domain, returns the
645 output product of GSI, otherwise returns None.
646 @param domain the WRFDomain of interest"""
652 """!Obtain output ghost product for the specified domain.
654 If this GSI is being run on the specified domain, returns
655 the output product of GSI, otherwise returns None.
656 @param domain the WRFDomain of interest"""
662 """!Obtain output wrfinput data for the outermost WRF domain.
664 If this GSI is being run on the WRF outermost domain (Mother
665 Of All Domains, or MOAD), returns the output product of GSI.
666 Otherwise, returns None."""
667 if self.domain.is_moad():
673 """!The WRF domain for which GSI is being run."""
676 def products(self,domains=None,prodtype='wrf_out_prod'):
677 """!Iterates over all output products of this Task.
678 @param domains the domains of interest
679 @param prodtype ignored"""
683 for domain
in domains:
689 """!Creates the GSI namelist in the specified file.
690 @param filename the destination filename"""
694 use_gfs_stratosphere=self.
confbool(
'use_gfs_stratosphere',
True)
696 invars.update( USE_GFS_STRATOSPHERE=use_gfs_stratosphere,
698 REGIONAL_OZONE=
False,
701 invars.update( USE_GFS_STRATOSPHERE=
False,
703 REGIONAL_OZONE=
False,
706 if not self.
confbool(
'use_newradbc',
False):
707 SETUP=
'''upd_pred(1)=0,upd_pred(2)=0,upd_pred(3)=0,
708 upd_pred(4)=0,upd_pred(5)=0,upd_pred(6)=0,
709 upd_pred(7)=0,upd_pred(8)=0,upd_pred(9)=0,
710 upd_pred(10)=0,upd_pred(11)=0,upd_pred(12)=0,'''
712 SETUP=
'''newpc4pred=.true., adp_anglebc=.true., angord=4,
713 passive_bc=.false., use_edges=.false., emiss_bc=.true.,
714 diag_precon=.true., step_start=1.e-3, upd_pred(1)=0,
715 upd_pred(2)=0,upd_pred(3)=0,upd_pred(4)=0,
716 upd_pred(5)=0,upd_pred(6)=0,upd_pred(7)=0,
717 upd_pred(8)=0,upd_pred(9)=0,upd_pred(10)=0,
718 upd_pred(11)=0,upd_pred(12)=0,'''
727 invars.update(NLAT=ny-1, NLON=nx-1, NETCDF=
True, LEVS=nz-1,
728 NLON_ENS_REGIONAL=nx-1,SETUP=SETUP,
729 NLAT_ENS_REGIONAL=ny-1)
730 invars.update(tdrtype=
'tldplrbufr')
731 invars.update(gps_dtype=
'gps_bnd')
733 singleobstest=self.
confbool(
'singleobstest',
False)
734 invars.update(ONEOBSTEST=singleobstest)
737 logger.info(
'Using regional ensemble.')
738 invars.update( MERGE_TWO_GRID_ENSPERTS=
False,
739 REGIONAL_ENSEMBLE_OPTION=2,
740 ENSEMBLE_SIZE_REGIONAL=self._ensda.nmembers)
742 logger.info(
'Using global ensemble.')
743 invars.update( HYBENS_REGIONAL=self.
hybrid_da,
746 nml_file=self.
confstr(
'nml_file')
751 with open(nml_file,
'rt')
as nf:
752 with open(filename,
'wt')
as of:
753 of.write(ni.parse(nf,logger=logger,source=nml_file,
754 raise_all=
True,atime=atime,**invars))
757 """!Called by run() after the gsi executable completes.
759 This is intended to be overridden by subclasses to perform
760 some action after gsi is complete, but before products are
761 delivered. The default implementation does nothing."""
764 """!Called by run() just before running the gsi program.
766 This is intended to be overridden by subclasses to perform
767 some action after all inputs needed for gsi are available, but
768 before gsi starts. The default implementation does nothing."""
771 """!Called by run() to obtain additional inputs before before_gsi()
773 This is intended to be overridden by subclasses to copy or
774 link more inputs for GSI. The default implementation does
778 """!Copies or links bias correction and satellite angle files"""
781 parent_atime=self.parent_atime
783 logger.info(
'%s: search for this at dataset=%s item=%s time=%s'
784 %(fn,ds,it,parent_atime.strftime(
'%Y%m%d%H')))
785 there=self._in_catalog.locate(ds,it,atime=parent_atime,
786 logger=logger,ftime=parent_atime)
787 logger.info(
'%s: found at %s'%(fn,there))
788 make_symlink(there,fn,force=
True,logger=logger)
789 if self.confbool(
'use_newradbc',
False):
790 get(self._biascr_dataset,self._biascr_item,
'satbias_in')
791 get(self._biascr_dataset,self._biascr_pc_item,
'satbias_pc')
793 get(self._biascr_dataset,self._abias_item,
'satbias_in')
794 get(self._biascr_dataset,self._satang_item,
'satbias_angle')
797 """!Runs the actual GSI executable."""
799 if self.
confbool(
'sync_frequently',
True):
801 cmd = (mpirun(mpi(self.
getexe(
'gsi')),allranks=
True)\
802 .env(OMP_STACKSIZE=
'128M') <
'gsiparm.anl')
807 logger.warning(repr(cmd))
811 self.
postmsg(
'Starting GSI for %s domain'%(str(self.
domain),))
813 sleeptime=self.
conffloat(
'sleeptime',30.0)
816 while ntries<maxtries:
819 getrlimit(logger=logger)
820 with rusage(logger=logger):
821 checkrun(cmd,sleeptime=sleeptime)
823 except Exception
as e:
825 logger.critical(
'GSI failed for %s domain: %s'
826 %(str(self.
domain),str(e)),exc_info=
True)
829 logger.warning(
'GSI failed for %s domain: %s, will retry %d more time(s)'
830 %(str(self.
domain),str(e),maxtries-ntries),
832 self.
postmsg(
'GSI succeeded for %s domain'%(str(self.
domain),))
835 """!Delivers output products.
837 This function is called by run() to deliver output files to
838 the intercom or com directory and record in the database that
839 they are delivered."""
840 d=os.path.dirname(self._wrf_out_prod.location)
843 self._wrf_out_prod.deliver(frominfo=
'wrf_inout',keep=
False,
846 'satbias_out',os.path.join(self.
outdir,
'satbias_out'),
847 keep=
False,logger=logger)
850 """!Creates GSI diagnostic files.
852 Makes some diagnostic files and copies them to the specified
853 delivery location. Part of this routine is threaded: specify
854 the number of worker threads in nthreads. Minimum is 1.
855 @param tgtpre the prefix to the output names, including the full path
856 @param nthreads the maximum number of threads to use"""
858 assert(isinstance(tgtpre,basestring))
859 assert(os.path.isabs(tgtpre))
865 logger.info(
'make diag *.gz files')
876 for dtype
in GSI_DATA_TYPES:
881 def _make_stdout_anl(self,tgtpre,logger):
882 """! Concatenates many "stdout" files into one.
884 Do not call this function directly; it is part of the
885 implementation of make_diag_files. It concatenates "stdout"
886 and fort.2* to a single log file.
887 @param tgtpre the prefix to the output names, including the full path
888 @param logger a logging.Logger to use for logging messages"""
890 logger.info(
'stdout.anl: make this from stdout and fort.2*')
891 with open(
'stdout.anl',
'wb')
as outf:
892 logger.info(
'stdout.anl: stdout')
893 with open(
'stdout',
'rb')
as inf:
894 shutil.copyfileobj(inf,outf)
895 for infile
in glob.iglob(
'fort.2*'):
897 logger.info(
'stdout.anl: %s'%(infile,))
898 with open(infile,
'rb')
as inf:
899 shutil.copyfileobj(inf,outf)
900 stdoutanl=tgtpre+
'.stdout.anl'
902 'stdout.anl',stdoutanl,keep=
False,logger=logger)
904 def _make_ensemble_spread(self,tgtpre,logger):
905 """Do not call this function directly; it is part of the
906 implementation of make_diag_files. It delivers ensemble
907 spread binary and ctl files to com directory."""
909 logger.info(
'deliver ensemble spread files')
910 ctl=tgtpre+
'.ens_spread.ctl'
911 grd=tgtpre+
'.ens_spread.grd'
912 if isnonempty(
'ens_spread.ctl'):
914 'ens_spread.ctl',ctl,keep=
False,logger=logger)
916 'ens_spread.grd',grd,keep=
False,logger=logger)
918 def _make_diag_for(self,tgtpre,dtype,logger):
919 """!Generates one diagnostic output file.
921 Do not call this function directly. It is part of the
922 internal implementation of make_diag_files. Each worker
923 thread calls this function for one GSI data type in the
924 GSI_DATA_TYPES array, to create two zlib-compressed (gzipped)
925 copies of various diagnostic files for that data type. There
926 are two such files per datatype: one for the first guess
927 ("ges") and one for the analysis ("anl").
928 @param tgtpre the prefix to the output names, including the full path
929 @param logger a logging.Logger to use for logging messages
930 @param dtype the string name of this datatype"""
931 for loop
in [
'01',
'03' ]:
933 if string==
'01': string=
'ges'
934 if string==
'03': string=
'anl'
935 theglob=
'pe*.%s_%s*'%(dtype,loop)
936 globbed=sorted(glob.glob(theglob))
938 logger.warning(
'No %s'%(theglob,))
940 tmpfile=
'diag_%s_%s.gz'%(dtype,string)
941 outfile=tgtpre+
'.'+tmpfile
942 logger.info(
'%s: gzip compress %d %s'
943 %(tmpfile,len(globbed),theglob))
948 outf=gzip.open(tmpfile,
'wb')
949 for infile
in globbed:
950 logger.info(
'%s: gzip compress %s'%(tmpfile,infile))
951 with open(infile,
'rb')
as inf:
953 indata=inf.read(blocksize)
954 if indata
is None or indata==
'':
962 tmpfile,outfile,keep=
False,logger=logger)
965 """!Runs the GSI and delivers the results.
967 Executes the GSI in a temporary scrub directory, deleting it
968 afterwards if self.scrub is False. Follows this overall
970 1. Make temporary area and cd there
972 3. Run the grab_more_inputs(), which subclasses should override
973 4. Calls make_gsi_namelist() to generate the namelist
974 5. Calls before_gsi() which subclasses should override
975 6. Calls run_gsi_exe() to run the actual GSI program
976 7. Calls after_gsi() which subclasses should override
977 8. Calls deliver_products() to copy files to COM
978 9. Generates diagnostic files.
979 10. Deletes the temporary directory if self.scrub=False"""
982 logger.info(
'Run gsi in directory %s'%(dirname,))
983 if os.path.exists(dirname):
984 logger.info(
'Delete old data in %s'%(dirname,))
985 shutil.rmtree(dirname)
989 if not self.
confbool(
'singleobstest',
False):
994 logger.info(
'This is a single obs test.')
1006 diagpre=self.
confstr(
'diagpre',
'')
1007 diagthreads=self.
confint(
'diagthreads',10)
1008 if diagpre
is None or diagpre==
'':
1009 logger.info(
'GSI diagnostic outputs are disabled.')
1013 self.
state=COMPLETED
1016 """!Links or copies to the local directory any fix or parm
1017 files needed by GSI."""
1019 logger.info(
"Copying fix and parm files")
1024 if self.
confbool(
'sat_radiance_da',
True):
1025 if self.
confbool(
'use_gfs_stratosphere',
True):
1026 anavinfo=s(
'{FIXgsi}/anavinfo_hwrf_L75')
1027 elif (self.
confbool(
'hwrf_43lev_conf',
False,section=
'prelaunch')
or self.
confbool(
'hwrf_other_conf',
False,section=
'prelaunch')):
1028 anavinfo=s(
'{FIXgsi}/anavinfo_hwrf_L42')
1030 anavinfo=s(
'{FIXgsi}/anavinfo_hwrf_L60')
1032 if (self.
confbool(
'hwrf_43lev_conf',
False,section=
'prelaunch')
or self.
confbool(
'hwrf_other_conf',
False,section=
'prelaunch')):
1033 anavinfo=s(
'{FIXgsi}/anavinfo_hwrf_L42_nooz')
1035 anavinfo=s(
'{FIXgsi}/anavinfo_hwrf_L60_nooz')
1036 berror=s(
'{FIXgsi}/nam_glb_berror.f77.gcv')
1037 emiscoef_IRwater=s(
'{FIXcrtm}/EmisCoeff/IR_Water/Big_Endian/Nalli.IRwater.EmisCoeff.bin')
1038 emiscoef_IRice=s(
'{FIXcrtm}/EmisCoeff/IR_Ice/SEcategory/Big_Endian/NPOESS.IRice.EmisCoeff.bin')
1039 emiscoef_IRland=s(
'{FIXcrtm}/EmisCoeff/IR_Land/SEcategory/Big_Endian/NPOESS.IRland.EmisCoeff.bin')
1040 emiscoef_IRsnow=s(
'{FIXcrtm}/EmisCoeff/IR_Snow/SEcategory/Big_Endian/NPOESS.IRsnow.EmisCoeff.bin')
1041 emiscoef_VISice=s(
'{FIXcrtm}/EmisCoeff/VIS_Ice/SEcategory/Big_Endian/NPOESS.VISice.EmisCoeff.bin')
1042 emiscoef_VISland=s(
'{FIXcrtm}/EmisCoeff/VIS_Land/SEcategory/Big_Endian/NPOESS.VISland.EmisCoeff.bin')
1043 emiscoef_VISsnow=s(
'{FIXcrtm}/EmisCoeff/VIS_Snow/SEcategory/Big_Endian/NPOESS.VISsnow.EmisCoeff.bin')
1044 emiscoef_VISwater=s(
'{FIXcrtm}/EmisCoeff/VIS_Water/SEcategory/Big_Endian/NPOESS.VISwater.EmisCoeff.bin')
1045 emiscoef_MWwater=s(
'{FIXcrtm}/EmisCoeff/MW_Water/Big_Endian/FASTEM6.MWwater.EmisCoeff.bin')
1046 aercoef=s(
'{FIXcrtm}/AerosolCoeff/Big_Endian/AerosolCoeff.bin')
1047 cldcoef=s(
'{FIXcrtm}/CloudCoeff/Big_Endian/CloudCoeff.bin')
1048 satinfo=s(
'{FIXgsi}/hwrf_satinfo.txt')
1049 atmsfilter=s(
'{FIXgsi}/atms_beamwidth.txt')
1050 scaninfo=s(
'{FIXgsi}/global_scaninfo.txt')
1051 satangl=s(
'{FIXgsi}/nam_global_satangbias.txt')
1052 pcpinfo=s(
'{FIXgsi}/nam_global_pcpinfo.txt')
1053 ozinfo=s(
'{FIXgsi}/global_ozinfo.txt')
1054 errtable=s(
'{FIXgsi}/hwrf_nam_errtable.r3dv')
1055 convinfo=s(
'{FIXgsi}/hwrf_convinfo.txt')
1058 bufrtable=s(
'{FIXgsi}/prepobs_prep.bufrtable')
1061 bftab_sst=s(
'{FIXgsi}/bufrtab.012')
1064 lnsf(anavinfo,
'./anavinfo')
1065 lnsf(berror,
'./berror_stats')
1066 lnsf(emiscoef_IRwater,
'./Nalli.IRwater.EmisCoeff.bin')
1067 lnsf(emiscoef_IRice,
'./NPOESS.IRice.EmisCoeff.bin')
1068 lnsf(emiscoef_IRsnow,
'./NPOESS.IRsnow.EmisCoeff.bin')
1069 lnsf(emiscoef_IRland,
'./NPOESS.IRland.EmisCoeff.bin')
1070 lnsf(emiscoef_VISice,
'./NPOESS.VISice.EmisCoeff.bin')
1071 lnsf(emiscoef_VISland,
'./NPOESS.VISland.EmisCoeff.bin')
1072 lnsf(emiscoef_VISsnow,
'./NPOESS.VISsnow.EmisCoeff.bin')
1073 lnsf(emiscoef_VISwater,
'./NPOESS.VISwater.EmisCoeff.bin')
1074 lnsf(emiscoef_MWwater,
'./FASTEM6.MWwater.EmisCoeff.bin')
1075 lnsf(aercoef,
'./AerosolCoeff.bin')
1076 lnsf(cldcoef,
'./CloudCoeff.bin')
1078 lnsf(satinfo,
'./satinfo')
1079 lnsf(scaninfo,
'./scaninfo')
1080 lnsf(pcpinfo,
'./pcpinfo')
1081 lnsf(ozinfo,
'./ozinfo')
1082 lnsf(convinfo,
'./convinfo')
1083 lnsf(errtable,
'./errtable')
1084 lnsf(atmsfilter,
'./atms_beamwidth.txt')
1085 lnsf(bufrtable,
'./prepobs_prep.bufrtable')
1086 lnsf(bftab_sst,
'./bftab_sstphr')
1089 with open(
'satinfo',
'rt')
as f:
1092 if not splat
or splat[0][0]==
'!':
continue
1094 spccoeff=s(
'{satsen}.SpcCoeff.bin',satsen=satsen)
1095 if not isnonempty(spccoeff):
1097 s(
'{FIXcrtm}/SpcCoeff/Big_Endian/{spccoeff}',
1099 s(
'{FIXcrtm}/TauCoeff/Big_Endian/{satsen}.TauCoeff.bin',
1101 '.',force=
True,logger=logger)
1104 """!Runs the GSI based on the HWRF FGAT scheme."""
1105 def __init__(self,dstore,conf,section,domain,wrf_in_prod,fgat_in_prods,
1106 sim,cycling_interval=
None,taskname=
None,atime=
None,
1107 enkf_domains=
None,ensda=
None,**kwargs):
1108 """!The FGATGSI constructor:
1109 @param dstore passed to Datum: the Datastore object for this Task
1110 @param conf the conf object for this task (passed to HWRFTask)
1111 @param section the conf section for this task (passed to HWRFTask)
1112 @param domain the WRFDomain for this GSI. Must have been
1113 initialized by a WRFSimulation
1114 @param wrf_in_prod the Product for the wrfinput_d01 or
1115 ghost_d0* file for that domain
1116 @param fgat_in_prods a mapping from analysis time to a product
1117 for all of the FGAT times
1118 @param sim the hwrf.wrf.WRFSimulation that will be run as the forecast
1119 @param cycling_interval Optional: time between HWRF forecast
1121 @param taskname Optional: the taskname for this product in the datastore
1122 @param atime the analysis time as a datetime.datetime.
1124 @param enkf_domains a list of WRF domains that should be copied
1125 from the hwrf.ensda.EnsembleDA.
1126 @param ensda a subclass of hwrf.ensda.DAEnsemble that provides
1127 regional ensemble forecasts to generate the forecast error
1129 @param kwargs ignored; passed to HWRFTask"""
1130 assert(wrf_in_prod
is not None)
1131 if cycling_interval
is None: cycling_interval=6*3600
1133 if atime
is None: atime=conf.cycle
1134 for prod
in fgat_in_prods.itervalues():
1137 for time,prod
in fgat_in_prods.iteritems() ]
1138 assert(wrf_in_prod
is not None)
1139 super(FGATGSI,self).
__init__(dstore,conf,section,domain,wrf_in_prod,
1140 sim,taskname=taskname,atime=atime,enkf_domains=enkf_domains,
1141 ensda=ensda,**kwargs)
1146 """!Iterates over FGAT forecast hours, relative to the parent
1149 Iterates over all FGAT forecast hours (as ints) relative to
1150 the parent analysis time. For example, 3, 6, and 9 for
1151 three-hourly FGAT off of GDAS."""
1154 yield int(round(to_fraction(t-parent_atime)/3600))
1157 """!Iterates over FGAT forecast hours relative to this model.
1159 Iterates over all FGAT forecast hours (as ints) relative to
1160 this model's analysis time. For example, -3, 0, and 3 for
1161 three-hourly FGAT off of GDAS."""
1164 yield int(round(to_fraction(t-atime,negok=
True)/3600))
1167 """!Links to the current working directory gdas native spectral
1168 output files for all FGAT hours."""
1172 ftime = atime + datetime.timedelta(hours=hr)
1173 there=self._in_catalog.locate(
1175 atime=atime,ftime=ftime,logger=logger)
1176 if not isnonempty(there):
1178 '%s %s: required input file is empty or non-existent: %s'
1180 here=
'gfs_sigf%02d'%(hr,)
1181 make_symlink(there,here,force=
True,logger=logger)
1184 """!Copy WRF analysis or input files to this directory.
1186 Copies the WRF analysis or input file to the specified
1187 filename. Also copies the wrf input files for other FGAT
1188 hours to separate files specified by "others". The "others"
1189 must be a format that includes at least one integer argument.
1190 @param filename ignored
1191 @param others a string format with a %d in it, used to
1192 generate output filenames"""
1199 dt=int(round(float((to_fraction(t-atime,negok=
True) + fci)/
1202 logger.debug(
'add prod=%s time=%s'%(repr(p.did),repr(dt)))
1207 for p,n
in names.iteritems():
1208 logger.debug(
'prod %s has name %s'%(repr(p.did),repr(n)))
1210 def namer(p,logger,*args):
return names[p]
1211 def actor(p,name,logger,*args):
1212 deliver_file(p.location,name,logger=logger)
1220 """!Delete the gsi status file.
1222 Deletes all GSI status files, whose paths are determined from the
1223 given config object. If the logger is not specified, the
1224 gsistatus subdomain of the conf default logging domain is used.
1225 @param conf the hwrf.config.HWRFConfig object
1226 @param logger Optional: a logging.Logger for logging."""
1227 if logger
is None: logger=conf.log(
'gsistatus')
1228 for onetwo
in (
'gsistatus',
'gsistatus2' ):
1229 gsistat=conf.get(
'dir',onetwo)
1230 gsistatfile=os.path.join(conf.getdir(
'com'),gsistat)
1234 """!Sets the GSI status files.
1236 Set run_gsi_d02=YES (true) or =NO (false) depending on the
1237 configuration. If the logger is not specified, the gsistatus
1238 subdomain of the conf default logging domain is used.
1239 @param conf the hwrf.config.HWRFConfig object
1240 @param logger Optional: the logging.Logger for log messages."""
1241 if logger
is None: logger=conf.log(
'set gsi status')
1243 tdrflagfile=conf.strinterp(
'dir',
'{com}/{stormlabel}.tdr')
1244 run_gsi=conf.getbool(
'config',
'run_gsi')
1245 conditional_gsid03=conf.getbool(
'config',
'conditional_gsid03',
False)
1246 conditional_gsid02=conf.getbool(
'config',
'conditional_gsid02',
False)
1248 for onetwo
in (
'gsistatus',
'gsistatus2' ):
1249 gsistat=conf.get(
'dir',onetwo)
1250 gsistatfile=os.path.join(conf.getdir(
'com'),gsistat)
1252 gsid02flag=
'YES' if run_gsi
else 'NO'
1253 gsid03flag=
'YES' if run_gsi
else 'NO'
1254 if gsid03flag==
'YES' and conditional_gsid03
and \
1255 not isnonempty(tdrflagfile):
1256 logger.info(
'GSI is disabled for d03 because flag file is '
1257 'empty or does not exist: %s'%(tdrflagfile,))
1259 if gsid02flag==
'YES' and conditional_gsid02
and \
1260 not isnonempty(tdrflagfile):
1261 logger.info(
'GSI is disabled for d02 because flag file is '
1262 'empty or does not exist: %s'%(tdrflagfile,))
1265 logger.info(
'Setting run_gsi_d02=%s in gsi status file %s'%(
1266 gsid02flag,gsistatfile))
1267 logger.info(
'Setting run_gsi_d03=%s in gsi status file %s'%(
1268 gsid03flag,gsistatfile))
1269 with open(gsistatfile,
'wt')
as f:
1270 f.write(
'run_gsi_d02=%s\nrun_gsi_d03=%s\n'%(
1271 gsid02flag,gsid03flag))
1274 """!Checks the gsi status for a specific domain.
1276 Checks the first GSI status file, scanning for information about
1277 the specified domain. If the file does not exist or cannot be
1278 opened or read, then False is returned. Otherwise, the file is
1279 scanned for run_gsi_d02=YES/NO or run_gsi_d03=YES/NO (case
1280 insensitive). The last of those run_gsi_d02/d03 lines is used:
1281 NO=return False, YES=return True.
1282 @param conf the hwrf.config.HWRFConfig object with configuration info
1283 @param domain either "gsi_d02" or "gsi_d03", the domain of interest
1284 @param logger Optional: the logging.Logger for log messages"""
1285 if domain!=
'gsi_d02' and domain!=
'gsi_d03':
1286 raise ValueError(
'In get_gsistatus, domain must be gsi_d02 '
1288 if logger
is None: logger=conf.log(
'get gsi status')
1290 gsistat=conf.get(
'config',
'gsistatus')
1291 gsistatfile=os.path.join(conf.getdir(
'com'),gsistat)
1294 logger.info(
'%s: scan gsi status file for run_%s=YES or NO'%(
1295 gsistatfile,domain))
1298 with open(gsistatfile,
'rt')
as f:
1300 if line.find(
'run_%s=YES'%(domain))>=0:
1303 'gsi status file says: run_%s=YES'%(domain))
1304 elif line.find(
'run_%s=NO'%(domain))>=0:
1307 'gsi status file says: run_%s=NO'%(domain))
1308 except EnvironmentError
as e:
1310 'Error checking gsi status file: %s'
1311 %(str(e),),exc_info=
True)
1312 except Exception
as ee:
1314 'Unhandled exception while checking gsi status file: %s'
1315 %(str(ee),),exc_info=
True)
1318 if gsi_success
is None:
1319 logger.warning(
'Could not scan gsi status file for run_%s=YES'
1320 'or NO. Assuming run_%s=NO.'%(domain,domain))
1324 'run_%s=YES: gsi status file says gsi init succeeded.'%(domain))
1327 'run_%s=NO: gsi status file says gsi init failed or was aborted.'
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
Create namelists, monitor wrf simulations, generate filenames.
def netcdfver(filename)
What is the NetCDF version of this file?
def wrfout_copier(self, file)
Generate the wrfout file converter.
cycling_interval
the time between forecast cycles, a datetime.timedelta
def make_diag_files(self, tgtpre, nthreads)
Creates GSI diagnostic files.
def set_ensda(self, ensda, enkf_domains)
Sets the hwrf.ensda.DAEnsemble to use, and the enkf domains.
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
def _make_ensemble_spread(self, tgtpre, logger)
def grab_bias_satang(self)
Copies or links bias correction and satellite angle files.
taskname
Read-only property: the name of this task.
A subclass of Product that represents file delivery.
The base class of tasks run by the HWRF system.
def wait_for_products
Waits for products to be available and performs an action on them.
def grab_more_inputs(self)
Links to the current working directory gdas native spectral output files for all FGAT hours...
def remove_file
Deletes the specified file.
def grab_gfs_enkf(self, atime=None, kwargs)
Links the GFS ENKF files to this directory.
conf
This HWRFTask's hwrf.config.HWRFConfig object.
def domain(self)
The WRF domain for which GSI is being run.
def wrf_top_Pa(self)
The WRF model top in pascals.
def atime(self)
The analysis time of this GSI.
def grab_obstype_section
Copies or links observations.
Runs the GSI based on the HWRF FGAT scheme.
def unset_gsistatus
Delete the gsi status file.
global_ensemble_size
The number of members in the global (parent model) ensemble.
def confbool
Alias for self.conf.getbool for section self.section.
def get_gsistatus
Checks the gsi status for a specific domain.
section
The confsection in self.section for this HWRFTask (read-only)
Base class of tasks run by HWRF.
A shell-like syntax for running serial, MPI and OpenMP programs.
def make_gsi_namelist
Creates the GSI namelist in the specified file.
def run_gsi_exe(self)
Runs the actual GSI executable.
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
A piece of data produced by a Task.
outdir
The directory in which this task should deliver its final output.
Contains the WorkPool class, which maintains pools of threads that perform small tasks.
def get
Alias for self.meta() Returns the value of the specified metadata key or returns default if it is uns...
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Stores products and tasks in an sqlite3 database file.
def get_wrfanl(self, domain)
Obtain output ghost product for the specified domain.
used_regional_ensemble
Was the regional ensemble used for forecast error covariances?
This subclass of TempDir takes a directory name, instead of generating one automatically.
def makedirs
Make a directory tree, working around filesystem bugs.
Time manipulation and other numerical routines.
This module allows querying resource usage and limits, as well as setting resource limits...
def parent_atime(self)
Parent model analysis time.
def get_wrfinput(self)
Obtain output wrfinput data for the outermost WRF domain.
def grab_prepbufr(self, atime=None, kwargs)
Links or copies the prepbufr file to the local directory.
def inputiter(self)
Iterate over needed inputs.
def grab_more_inputs(self)
Called by run() to obtain additional inputs before before_gsi()
workdir
The directory in which this task should be run.
def products
Iterates over all output products of this Task.
def confint
Alias for self.conf.getint for section self.section.
def __init__(self, dstore, conf, section, domain, wrf_in_prod, sim, taskname=None, atime=None, parent_atime=None, enkf_domains=None, ensda=None, kwargs)
The GSIBase constructor:
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
def conffloat
Alias for self.conf.getfloat for section self.section.
def before_gsi(self)
Called by run() just before running the gsi program.
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
A pool of threads that perform some list of tasks.
def log
Obtain a logging domain.
Insert config file data into a Fortran namelist file.
def after_gsi(self)
Called by run() after the gsi executable completes.
def conftimestrinterp(self, string, ftime, atime=None, section=None, kwargs)
Alias for self.timestr for backward comaptibility.
Base class of anything that runs the GSI.
def copy_wrf_inout
Copies the WRF analysis or input file to the specified filename.
def _make_stdout_anl(self, tgtpre, logger)
Concatenates many "stdout" files into one.
def write_vitals
Writes the tcvitals (from self.storminfo) to the specified file.
def fgat_fhrs(self)
Iterates over FGAT forecast hours relative to this model.
def _make_diag_for(self, tgtpre, dtype, logger)
Generates one diagnostic output file.
def deliver_products(self)
Delivers output products.
Exceptions raised by the hwrf package.
def run(self)
Runs the GSI and delivers the results.
def __init__(self, dstore, conf, section, domain, wrf_in_prod, fgat_in_prods, sim, cycling_interval=None, taskname=None, atime=None, enkf_domains=None, ensda=None, kwargs)
The FGATGSI constructor:
def confstr
Alias for self.conf.getstr for section self.section.
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
def grab_wrf_enkf(self, ensda)
Links the WRF ENKF files to this directory.
def set_gsistatus
Sets the GSI status files.
def get_ghost(self, domain)
Obtain output ghost product for the specified domain.
def find_exe
Searches the $PATH or a specified iterable of directory names to find an executable file with the giv...
def parent_fhrs(self)
Iterates over FGAT forecast hours, relative to the parent analysis time.
def taskvars(self)
The dict of object-local values used for string substitution.
def realtime(self)
Is this job a real-time forecast job?
def icstr(self, string, section=None, kwargs)
Expands a string in the given conf section.
def grab_enkf_input(self)
Link or copy ensemble inputs.
hybrid_da
Was hybrid 3DVAR-ENKF data assimilation used?
def copy_wrf_inout
Copy WRF analysis or input files to this directory.
def grab_bufr
Link bufr files.
def grab_fix_parm(self)
Links or copies to the local directory any fix or parm files needed by GSI.
def make_symlink
Creates a symbolic link "target" that points to "source".