1 """!Contains Bufrprep, which converts data tanks to bufr files, and
2 otherwise preprocesses the prepbufr and bufr files."""
22 """!This is a HWRF task that preprocesses observations in data
23 tanks to create bufr files suitable for input to hwrf.gsi classes.
24 It also does other preprocessing of bufr and prepbufr files."""
26 def __init__(self,dstore,conf,section,taskname=None,atime=None,
28 """!Bufrprep constructor
29 @param dstore passed to Datum: the Datastore object for this Task
30 @param conf the conf object for this task (passed to HWRFTask)
31 @param section the conf section for this task (passed to HWRFTask)
32 @param taskname Optional: the taskname for this product in the datastore
33 @param atime the analsysis time
34 @param kwargs Additional keyword arguments are passed to the
35 hwrf.hwrftask.HWRFTask.__init__ constructor"""
36 super(Bufrprep,self).
__init__(dstore,conf,section,taskname=taskname,
40 if atime
is None: atime=conf.cycle
41 self.
_atime=to_datetime(atime)
47 if 'in_catalog' in kwargs:
48 ink=kwargs[
'in_catalog']
51 elif isinstance(ink,basestring):
53 elif ink
is None:
pass
56 'In hwrf.bufrprep.Bufrprep.__init__, in_catalog must be None, '
57 'a basestring or a DataCatalog. You provided an object '
58 'of type %s, with value %s.'
59 %(type(ink).__name__,repr(ink)))
61 if incat_name
is None:
62 incat_name=self.
confstr(
'catalog')
70 atime=self._atime.strftime(
'%Y%m%d%H')
72 basin=self.storminfo.pubbasin2
75 elif basin.upper()==
'AL':
76 self.
_stormid=
'%s%02d' % (
'1',self.storminfo.stnum)
77 elif basin.upper()==
'EP':
78 self.
_stormid=
'%s%02d' % (
'2',self.storminfo.stnum)
79 elif basin.upper()==
'CP':
80 self.
_stormid=
'%s%02d' % (
'3',self.storminfo.stnum)
84 logger.info(
'get input storm id %s'%(self.
_stormid))
88 """!Links or copies all needed bufr files to the local
90 @param atime the analysis time
91 @param morevars passed to hwrf.config.HWRFConfig.get()
92 and similar routines to define string replacements"""
95 for osection
in olist.split(
','):
97 if len(trim)>0
and not trim
in touched:
101 """!Copies or links observations specified in the obstype
102 sections to the current working directory.
104 The section listed in self.section should contain an obstype
105 option, whose value is a comma separated list of section
106 names. This method reads every section in that list. For
107 each section, the option keys are the local directory
108 filenames expected by GSI, while the values are the data type
109 part of the operational filename (ie.: the satwind in
110 gfs.t12z.tm00.satwind.bufr_d). There are a few special keys:
112 * dataset - the name of the dataset for hwrf.input purposes
113 * item - the name of the item for hwrf.input purposes
114 * type - the type of observation: satellite, or anything else.
115 At present, only "satellite" has any special meaning.
117 If the type is "satellite" then the entire section will be
118 skipped if sat_da=False in this task's config section.
120 Once the section is parsed, the files are all linked to this
123 @param section Name of the section to read.
124 @param atime Analysis time.
125 @param morevars A dict for additional string replacements
126 in the hwrf.config.HWRFConfig.get() family of functions."""
128 if not isinstance(section,basestring): section=str(section)
133 atime=to_datetime_rel(atime,self.
_atime)
135 dataset=self.conf.get(section,
'dataset')
136 item=self.conf.get(section,
'item')
137 otype=self.conf.get(section,
'type').lower()
139 logger.warning(
'process obs section %s with dataset=%s item=%s '
140 'type=%s'%(section,dataset,item,otype))
143 items=self.conf.items(section)
144 otdict=dict( [ (v,k)
for k,v
in items ] )
145 namer=
lambda f,t: otdict[t]
147 for localname,obstype
in items:
148 if localname
in [
'dataset',
'item',
'type']:
continue
149 obstypes.append(obstype)
151 for obstype
in obstypes:
152 logger.warning(
'Find obstype=%s in dataset=%s item=%s'
153 %(obstype,dataset,item))
154 if not isinstance(obstype,basestring):
156 'In bufrprep.Bufrprep.link_bufr, the obstypes parameter must '
157 'be an iterable container of basestrings. One of the '
158 'elements was a %s (value %s) instead.'
159 %(type(obstype).__name__,repr(obstype)))
160 there=self._in_catalog.locate(dataset,item,atime=atime,
161 logger=logger,obstype=obstype)
162 if there
is None or there==
'':
163 msg=
'%s: Could not find a location for this obstype.'\
167 bn=os.path.basename(there)
169 make_symlink(there,on,logger=logger,force=
True)
171 msg=
'%s: Observation file is empty or non-existant: %s'\
176 """!Links or copies the prepbufr file to the local directory.
177 @param atime The analysis time.
178 @param kwargs More string substitution variables for the
179 hwrf.config.HWRFConfig family of functions."""
183 atime=to_datetime_rel(atime,self.
_atime)
185 bufr_dataset=self.
confstr(
'bufr_dataset')
186 prepbufr_item=self.
confstr(
'prepbufr_item')
187 there=self._in_catalog.locate(bufr_dataset,prepbufr_item,
188 atime=atime,logger=logger,**kwargs)
189 if there
is None or there==
'':
190 msg=
'Could not find the prepbufr file (item=%s dataset=%s)' \
191 %(repr(prepbufr_item),repr(bufr_dataset))
195 msg=there+
': is non-existant or empty'
198 deliver_file(there,
'prepbufr.ALL',keep=
True,logger=logger)
201 """!Dump TDR data for operational run
202 @param atime The analysis time.
203 @param morevars More string substitution variables for the
204 hwrf.config.HWRFConfig.get() family of functions."""
208 atime=to_datetime_rel(atime,self.
_atime)
210 input_catalog=self.conf.get(
'config',
'fcst_catalog')
211 dcom=self.conf.get(input_catalog,
'dcom',
'/dcom/us007003')
212 if os.path.isdir(dcom):
213 btime=to_datetime_rel(-24*3600,atime)
214 tank1=os.path.join(dcom,btime.strftime(
"%Y%m%d"),
'b006/xx070')
215 tank2=os.path.join(dcom,atime.strftime(
"%Y%m%d"),
'b006/xx070')
216 logger.info(
'tank1: %s, tank2: %s'%(tank1,tank2))
217 if isnonempty(tank1)
or isnonempty(tank2):
218 run(exe(
'/nwprod/ush/dumpjb') \
219 [atime.strftime(
"%Y%m%d%H"),
'3.00',
'tldplr'] \
223 if isnonempty(tdrbufr):
225 tdrbufr,
'tldplrbufr',keep=
True,logger=logger)
227 logger.info(
'tank1: %s, tank2: %s not exist'%(tank1,tank2))
229 raise GSIInputError(
'dcom is not set in the system.conf [dir] section - aborting')
231 logger.info(
'%s does not exist on %s. This is not wcoss. TDR Bufr'
232 'dump can only be run on WCOSS in real-time mode'%(
236 """!pre-process prepbufr data
239 * option 0: make no change
240 * option 1: remove some inner-core data
241 * option 2: flag/unflag mass and dropsonde u, v data
242 * option 3: unflag HS3 dropsonde data"""
244 fortlink({ 21:
"./prepbufr.ALL",
247 fprog =
'hwrf_rem_prepbufr_typ_in_circle'
251 RLONC=self.storminfo.lon, \
255 fprog =
'hwrf_change_prepbufr_qm_in_circle'
262 fprog =
'hwrf_change_prepbufr_qm_typ'
266 if option > 0
and option <= 3:
270 logger.info(
'no greater than 3 option, skip prep_prepbufr')
273 """!Writes the tcvitals (from self.storminfo) to the specified
275 @param filename Name of the file to write
276 @param logger A logging.Logger for log messages"""
277 if logger
is None: logger=self.
log()
278 logger.info(
'Writing tcvitals to %s'%(repr(filename),))
279 with open(filename,
'wt')
as f:
280 f.write(self.storminfo.as_tcvitals()+
"\n")
281 assert(os.path.exists(filename))
284 """!Runs and delivers the results."""
287 logger.info(
'Run bufrprep in directory %s'%(self.
_dirname,))
289 logger.info(
'Delete old data in %s'%(self.
_dirname,))
292 """dump and precess TDR data"""
298 if os.path.isfile(
'tldplrbufr'):
300 logger.info(
'storm id %s'%(self.
_stormid))
305 logger.info(
'Skip read tdr bufr.')
306 """precess prepbufr data"""
307 prepbufrprep=self.
confint(
'prepbufrprep',0)
313 except Exception
as e:
314 logger.critical(
'bufrprep failed: '+str(e),exc_info=
True)
318 """!Runs the hwrf_readtdrstmid program."""
319 self.
log().info(
'readtdrstmid')
321 fprog =
'hwrf_readtdrstmid'
324 log =
'%s/logs/%s_%s.log' %(
325 self.
_dirname, self.__class__.__name__, fprog)
331 """!Runs the hwrf_readtdrtime program."""
332 self.
log().info(
'readtdrtime')
334 fprog =
'hwrf_readtdrtime'
337 log =
'%s/logs/%s_%s.log' %(
338 self.
_dirname, self.__class__.__name__, fprog)
344 """!Create a TDR status file in com directory"""
345 self.
log().info(
'set_tdrstatus')
347 stmidout=os.path.join(self.
_dirname,
'stmid.dat')
348 timeout=os.path.join(self.
_dirname,
'tdrflag')
349 rightstorm=isnonempty(stmidout)
350 smalledgedump=isnonempty(timeout)
351 tdrflagfile=os.path.join(self.conf.getdir(
'com'),self.
icstr(
'{stormlabel}.tdr'))
352 if rightstorm
and not smalledgedump:
353 with open(tdrflagfile,
'wt')
as f:
354 f.write(
'ASSIMILATE_TDR=YES'+
"\n")
355 logger.info(
'tdr bufr is available for this storm, ASSIMILATE_TDR=YES')
357 logger.info(
'tdr bufr is not for this storm, ASSIMILATE_TDR=NO')
359 logger.info(
'this tdr bufr file is a small edge dump, ASSIMILATE_TDR=NO')
362 """!Delivers output products to the intercom directory.
363 @param atime the analysis time
364 @param kwargs Sent to hwrf.input.DataCatalog.parse()"""
368 atime=to_datetime_rel(atime,self.
_atime)
371 if self.
confint(
'prepbufrprep',0) > 0:
372 prepbufr_item=self.
confstr(
'prepbufr_item')
373 there=self._in_catalog.parse(prepbufr_item,
374 atime=atime,logger=logger,**kwargs)
375 it=os.path.join(self.
outdir,there)
377 'prepbufr',it,keep=
False,logger=logger)
378 if bool(self.
realtime)
and isnonempty(
'tldplrbufr'):
379 item=self.conf.get(
'tdr_new_obstype',
'item')
380 there=self._in_catalog.parse(item,atime=atime,
381 logger=logger,obstype=
'tldplr')
382 it=os.path.join(self.
outdir,there)
384 'tldplrbufr',it,keep=
True,logger=logger)
385 tdrprod=os.path.join(self.conf.getdir(
'com'),there)
387 'tldplrbufr',tdrprod,keep=
False,logger=logger)
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
def readtdrstmid(self)
Runs the hwrf_readtdrstmid program.
def getstormid(self)
The storm ID.
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
def redirect(self)
Should subprograms' outputs be redirected to separate files?
def set_tdrstatus(self)
Create a TDR status file in com directory.
taskname
Read-only property: the name of this task.
The base class of tasks run by the HWRF system.
conf
This HWRFTask's hwrf.config.HWRFConfig object.
def deliver_products(self, atime=None, kwargs)
Delivers output products to the intercom directory.
def tdrdump
Dump TDR data for operational run.
def run(self)
Runs and delivers the results.
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
def prep_prepbufr(self, option)
pre-process prepbufr data
def grab_obstype_section
Copies or links observations specified in the obstype sections to the current working directory...
A shell-like syntax for running serial, MPI and OpenMP programs.
Base class of tasks run by HWRF.
def grab_prepbufr(self, atime=None, kwargs)
Links or copies the prepbufr file to the local directory.
outdir
The directory in which this task should deliver its final output.
def __init__(self, dstore, conf, section, taskname=None, atime=None, kwargs)
Bufrprep constructor.
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
def grab_bufr
Links or copies all needed bufr files to the local directory.
This subclass of TempDir takes a directory name, instead of generating one automatically.
def makedirs
Make a directory tree, working around filesystem bugs.
Time manipulation and other numerical routines.
Provides information about the cluster on which this job is running.
workdir
The directory in which this task should be run.
def confint
Alias for self.conf.getint for section self.section.
def conffloat
Alias for self.conf.getfloat for section self.section.
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
def log
Obtain a logging domain.
Exceptions raised by the hwrf package.
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
def name()
Synonym for here.name.
def confstr
Alias for self.conf.getstr for section self.section.
def write_vitals
Writes the tcvitals (from self.storminfo) to the specified file.
def readtdrtime(self)
Runs the hwrf_readtdrtime program.
def realtime(self)
Is this job a real-time forecast job?
def icstr(self, string, section=None, kwargs)
Expands a string in the given conf section.
This is a HWRF task that preprocesses observations in data tanks to create bufr files suitable for in...