HWRF  trunk@4391
bufrprep.py
1 """!Contains Bufrprep, which converts data tanks to bufr files, and
2 otherwise preprocesses the prepbufr and bufr files."""
3 
4 ##@var __all__
5 # List of symbols to export by "from hwrf.bufrprep import *"
6 __all__ = ['Bufrprep']
7 
8 import os, shutil
9 import hwrf.hwrftask
10 import datetime
12 
13 from produtil.cd import NamedDir
14 from produtil.fileop import make_symlink, deliver_file, isnonempty, \
15  fortlink
16 from produtil.run import run, exe
17 from hwrf.hwrftask import HWRFTask
18 from hwrf.numerics import to_datetime, to_datetime_rel
19 from hwrf.exceptions import GSIInputError
20 
22  """!This is a HWRF task that preprocesses observations in data
23  tanks to create bufr files suitable for input to hwrf.gsi classes.
24  It also does other preprocessing of bufr and prepbufr files."""
25 
26  def __init__(self,dstore,conf,section,taskname=None,atime=None,
27  **kwargs):
28  """!Bufrprep constructor
29  @param dstore passed to Datum: the Datastore object for this Task
30  @param conf the conf object for this task (passed to HWRFTask)
31  @param section the conf section for this task (passed to HWRFTask)
32  @param taskname Optional: the taskname for this product in the datastore
33  @param atime the analsysis time
34  @param kwargs Additional keyword arguments are passed to the
35  hwrf.hwrftask.HWRFTask.__init__ constructor"""
36  super(Bufrprep,self).__init__(dstore,conf,section,taskname=taskname,
37  **kwargs)
38 
39  taskname=self.taskname
40  if atime is None: atime=conf.cycle
41  self._atime=to_datetime(atime)
42  self._dirname=self.workdir
43  self._stormid='999'
44  # Get the DataCatalog for our data grabbers:
45  self._in_catalog=None
46  incat_name=None
47  if 'in_catalog' in kwargs:
48  ink=kwargs['in_catalog']
49  if isinstance(ink,hwrf.input.DataCatalog):
50  self._in_catalog=ink
51  elif isinstance(ink,basestring):
52  incat_name=ink
53  elif ink is None: pass
54  else:
55  raise TypeError(
56  'In hwrf.bufrprep.Bufrprep.__init__, in_catalog must be None, '
57  'a basestring or a DataCatalog. You provided an object '
58  'of type %s, with value %s.'
59  %(type(ink).__name__,repr(ink)))
60  if self._in_catalog is None:
61  if incat_name is None:
62  incat_name=self.confstr('catalog')
64  self.conf,incat_name,self._atime)
65 
66 
67  def getstormid(self):
68  """!The storm ID."""
69  logger=self.log()
70  atime=self._atime.strftime('%Y%m%d%H')
71  year=int(atime[0:4])
72  basin=self.storminfo.pubbasin2
73  if year <= 2013:
74  self._stormid=self.storminfo.stnum
75  elif basin.upper()=='AL':
76  self._stormid='%s%02d' % ('1',self.storminfo.stnum)
77  elif basin.upper()=='EP':
78  self._stormid='%s%02d' % ('2',self.storminfo.stnum)
79  elif basin.upper()=='CP':
80  self._stormid='%s%02d' % ('3',self.storminfo.stnum)
81  else:
82  self._stormid='999'
83 
84  logger.info('get input storm id %s'%(self._stormid))
85  return self._stormid
86 
87  def grab_bufr(self,atime=None,morevars=None):
88  """!Links or copies all needed bufr files to the local
89  directory.
90  @param atime the analysis time
91  @param morevars passed to hwrf.config.HWRFConfig.get()
92  and similar routines to define string replacements"""
93  olist=self.confstr('obstypes')
94  touched=set()
95  for osection in olist.split(','):
96  trim=osection.strip()
97  if len(trim)>0 and not trim in touched:
98  self.grab_obstype_section(trim,atime=atime,morevars=morevars)
99 
100  def grab_obstype_section(self,section,atime=None,morevars=None):
101  """!Copies or links observations specified in the obstype
102  sections to the current working directory.
103 
104  The section listed in self.section should contain an obstype
105  option, whose value is a comma separated list of section
106  names. This method reads every section in that list. For
107  each section, the option keys are the local directory
108  filenames expected by GSI, while the values are the data type
109  part of the operational filename (ie.: the satwind in
110  gfs.t12z.tm00.satwind.bufr_d). There are a few special keys:
111 
112  * dataset - the name of the dataset for hwrf.input purposes
113  * item - the name of the item for hwrf.input purposes
114  * type - the type of observation: satellite, or anything else.
115  At present, only "satellite" has any special meaning.
116 
117  If the type is "satellite" then the entire section will be
118  skipped if sat_da=False in this task's config section.
119 
120  Once the section is parsed, the files are all linked to this
121  directory.
122 
123  @param section Name of the section to read.
124  @param atime Analysis time.
125  @param morevars A dict for additional string replacements
126  in the hwrf.config.HWRFConfig.get() family of functions."""
127  logger=self.log()
128  if not isinstance(section,basestring): section=str(section)
129 
130  if atime is None:
131  atime=self._atime
132  else:
133  atime=to_datetime_rel(atime,self._atime)
134 
135  dataset=self.conf.get(section,'dataset')
136  item=self.conf.get(section,'item')
137  otype=self.conf.get(section,'type').lower()
138 
139  logger.warning('process obs section %s with dataset=%s item=%s '
140  'type=%s'%(section,dataset,item,otype))
141 
142  obstypes=list()
143  items=self.conf.items(section)
144  otdict=dict( [ (v,k) for k,v in items ] )
145  namer=lambda f,t: otdict[t]
146 
147  for localname,obstype in items:
148  if localname in ['dataset','item','type']: continue
149  obstypes.append(obstype)
150 
151  for obstype in obstypes:
152  logger.warning('Find obstype=%s in dataset=%s item=%s'
153  %(obstype,dataset,item))
154  if not isinstance(obstype,basestring):
155  raise TypeError(
156  'In bufrprep.Bufrprep.link_bufr, the obstypes parameter must '
157  'be an iterable container of basestrings. One of the '
158  'elements was a %s (value %s) instead.'
159  %(type(obstype).__name__,repr(obstype)))
160  there=self._in_catalog.locate(dataset,item,atime=atime,
161  logger=logger,obstype=obstype)
162  if there is None or there=='':
163  msg='%s: Could not find a location for this obstype.'\
164  %(obstype,)
165  logger.warning(msg)
166  elif produtil.fileop.isnonempty(there):
167  bn=os.path.basename(there)
168  on=namer(bn,obstype)
169  make_symlink(there,on,logger=logger,force=True)
170  else:
171  msg='%s: Observation file is empty or non-existant: %s'\
172  %(obstype,there)
173  logger.warning(msg)
174 
175  def grab_prepbufr(self,atime=None,**kwargs):
176  """!Links or copies the prepbufr file to the local directory.
177  @param atime The analysis time.
178  @param kwargs More string substitution variables for the
179  hwrf.config.HWRFConfig family of functions."""
180  if atime is None:
181  atime=self._atime
182  else:
183  atime=to_datetime_rel(atime,self._atime)
184  logger=self.log()
185  bufr_dataset=self.confstr('bufr_dataset')
186  prepbufr_item=self.confstr('prepbufr_item')
187  there=self._in_catalog.locate(bufr_dataset,prepbufr_item,
188  atime=atime,logger=logger,**kwargs)
189  if there is None or there=='':
190  msg='Could not find the prepbufr file (item=%s dataset=%s)' \
191  %(repr(prepbufr_item),repr(bufr_dataset))
192  logger.warning(msg)
193  raise GSIInputError(msg)
194  elif not produtil.fileop.isnonempty(there):
195  msg=there+': is non-existant or empty'
196  logger.error(msg)
197  raise GSIInputError(msg)
198  deliver_file(there,'prepbufr.ALL',keep=True,logger=logger)
199 
200  def tdrdump(self,atime=None,morevars=None):
201  """!Dump TDR data for operational run
202  @param atime The analysis time.
203  @param morevars More string substitution variables for the
204  hwrf.config.HWRFConfig.get() family of functions."""
205  if atime is None:
206  atime=self._atime
207  else:
208  atime=to_datetime_rel(atime,self._atime)
209  logger=self.log()
210  input_catalog=self.conf.get('config','fcst_catalog')
211  dcom=self.conf.get(input_catalog,'dcom','/dcom/us007003')
212  if os.path.isdir(dcom):
213  btime=to_datetime_rel(-24*3600,atime)
214  tank1=os.path.join(dcom,btime.strftime("%Y%m%d"),'b006/xx070')
215  tank2=os.path.join(dcom,atime.strftime("%Y%m%d"),'b006/xx070')
216  logger.info('tank1: %s, tank2: %s'%(tank1,tank2))
217  if isnonempty(tank1) or isnonempty(tank2):
218  run(exe('/nwprod/ush/dumpjb') \
219  [atime.strftime("%Y%m%d%H"), '3.00', 'tldplr'] \
220  .env(TANK=dcom, \
221  DATA=self._dirname))
222  tdrbufr='tldplr.ibm'
223  if isnonempty(tdrbufr):
225  tdrbufr,'tldplrbufr',keep=True,logger=logger)
226  else:
227  logger.info('tank1: %s, tank2: %s not exist'%(tank1,tank2))
228  elif produtil.cluster.name() in ( 'tide', 'gyre' ):
229  raise GSIInputError('dcom is not set in the system.conf [dir] section - aborting')
230  else:
231  logger.info('%s does not exist on %s. This is not wcoss. TDR Bufr'
232  'dump can only be run on WCOSS in real-time mode'%(
233  dcom,produtil.cluster.name()))
234 
235  def prep_prepbufr(self,option):
236  """!pre-process prepbufr data
237 
238  Options:
239  * option 0: make no change
240  * option 1: remove some inner-core data
241  * option 2: flag/unflag mass and dropsonde u, v data
242  * option 3: unflag HS3 dropsonde data"""
243  logger=self.log()
244  fortlink({ 21:"./prepbufr.ALL",
245  51:"./prepbufr"})
246  if option == 1:
247  fprog = 'hwrf_rem_prepbufr_typ_in_circle'
248  prog = self.getexe(fprog)
249  RRADC=self.conffloat('RRADC',0.)
250  cmd = produtil.run.exe(prog).env(RLATC=self.storminfo.lat, \
251  RLONC=self.storminfo.lon, \
252  RRADC=RRADC)
253  elif option == 2:
254  self.write_vitals()
255  fprog = 'hwrf_change_prepbufr_qm_in_circle'
256  prog = self.getexe(fprog)
257  RRADC=self.conffloat('RRADC',0.)
258  RBLDC=self.conffloat('RBLDC',0.)
259  cmd = produtil.run.exe(prog).env(RRADC=RRADC, \
260  RBLDC=RBLDC)
261  elif option == 3:
262  fprog = 'hwrf_change_prepbufr_qm_typ'
263  prog = self.getexe(fprog)
264  cmd = produtil.run.exe(prog)
265 
266  if option > 0 and option <= 3:
267  if self.redirect: cmd = cmd >= log
268  produtil.run.checkrun(cmd,logger=logger)
269  else:
270  logger.info('no greater than 3 option, skip prep_prepbufr')
271 
272  def write_vitals(self,filename='tcvitals',logger=None):
273  """!Writes the tcvitals (from self.storminfo) to the specified
274  file.
275  @param filename Name of the file to write
276  @param logger A logging.Logger for log messages"""
277  if logger is None: logger=self.log()
278  logger.info('Writing tcvitals to %s'%(repr(filename),))
279  with open(filename,'wt') as f:
280  f.write(self.storminfo.as_tcvitals()+"\n")
281  assert(os.path.exists(filename))
282 
283  def run(self):
284  """!Runs and delivers the results."""
285  logger=self.log()
286  try:
287  logger.info('Run bufrprep in directory %s'%(self._dirname,))
288  if os.path.exists(self._dirname):
289  logger.info('Delete old data in %s'%(self._dirname,))
290  shutil.rmtree(self._dirname)
291  with NamedDir(self._dirname,keep=not self.scrub):
292  """dump and precess TDR data"""
293  realtime=bool(self.realtime)
294  if realtime:
295  self.tdrdump()
296  else:
297  self.grab_bufr()
298  if os.path.isfile('tldplrbufr'):
299  self.getstormid()
300  logger.info('storm id %s'%(self._stormid))
301  self.readtdrstmid()
302  self.readtdrtime()
303  self.set_tdrstatus()
304  else:
305  logger.info('Skip read tdr bufr.')
306  """precess prepbufr data"""
307  prepbufrprep=self.confint('prepbufrprep',0)
308  if prepbufrprep > 0:
309  self.grab_prepbufr()
310  self.prep_prepbufr(prepbufrprep)
311  self.deliver_products()
312 
313  except Exception as e:
314  logger.critical('bufrprep failed: '+str(e),exc_info=True)
315  raise
316 
317  def readtdrstmid(self):
318  """!Runs the hwrf_readtdrstmid program."""
319  self.log().info('readtdrstmid')
320  logger=self.log()
321  fprog = 'hwrf_readtdrstmid'
322  prog = self.getexe(fprog)
323 
324  log = '%s/logs/%s_%s.log' %(
325  self._dirname, self.__class__.__name__, fprog)
326  cmd = produtil.run.exe(prog) << self._stormid
327  if self.redirect: cmd = cmd >= log
328  produtil.run.checkrun(cmd,logger=logger)
329 
330  def readtdrtime(self):
331  """!Runs the hwrf_readtdrtime program."""
332  self.log().info('readtdrtime')
333  logger=self.log()
334  fprog = 'hwrf_readtdrtime'
335  prog = self.getexe(fprog)
336 
337  log = '%s/logs/%s_%s.log' %(
338  self._dirname, self.__class__.__name__, fprog)
339  cmd = produtil.run.exe(prog)
340  if self.redirect: cmd = cmd >= log
341  produtil.run.checkrun(cmd,logger=logger)
342 
343  def set_tdrstatus(self):
344  """!Create a TDR status file in com directory"""
345  self.log().info('set_tdrstatus')
346  logger=self.log()
347  stmidout=os.path.join(self._dirname,'stmid.dat')
348  timeout=os.path.join(self._dirname,'tdrflag')
349  rightstorm=isnonempty(stmidout)
350  smalledgedump=isnonempty(timeout)
351  tdrflagfile=os.path.join(self.conf.getdir('com'),self.icstr('{stormlabel}.tdr'))
352  if rightstorm and not smalledgedump:
353  with open(tdrflagfile,'wt') as f:
354  f.write('ASSIMILATE_TDR=YES'+"\n")
355  logger.info('tdr bufr is available for this storm, ASSIMILATE_TDR=YES')
356  elif not rightstorm:
357  logger.info('tdr bufr is not for this storm, ASSIMILATE_TDR=NO')
358  else:
359  logger.info('this tdr bufr file is a small edge dump, ASSIMILATE_TDR=NO')
360 
361  def deliver_products(self,atime=None,**kwargs):
362  """!Delivers output products to the intercom directory.
363  @param atime the analysis time
364  @param kwargs Sent to hwrf.input.DataCatalog.parse()"""
365  if atime is None:
366  atime=self._atime
367  else:
368  atime=to_datetime_rel(atime,self._atime)
369  logger=self.log()
371  if self.confint('prepbufrprep',0) > 0:
372  prepbufr_item=self.confstr('prepbufr_item')
373  there=self._in_catalog.parse(prepbufr_item,
374  atime=atime,logger=logger,**kwargs)
375  it=os.path.join(self.outdir,there)
377  'prepbufr',it,keep=False,logger=logger)
378  if bool(self.realtime) and isnonempty('tldplrbufr'):
379  item=self.conf.get('tdr_new_obstype','item')
380  there=self._in_catalog.parse(item,atime=atime,
381  logger=logger,obstype='tldplr')
382  it=os.path.join(self.outdir,there)
384  'tldplrbufr',it,keep=True,logger=logger)
385  tdrprod=os.path.join(self.conf.getdir('com'),there)
387  'tldplrbufr',tdrprod,keep=False,logger=logger)
388 
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
Definition: fileop.py:359
def readtdrstmid(self)
Runs the hwrf_readtdrstmid program.
Definition: bufrprep.py:317
def getstormid(self)
The storm ID.
Definition: bufrprep.py:67
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
def redirect(self)
Should subprograms' outputs be redirected to separate files?
Definition: hwrftask.py:190
def set_tdrstatus(self)
Create a TDR status file in com directory.
Definition: bufrprep.py:343
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
conf
This HWRFTask's hwrf.config.HWRFConfig object.
Definition: hwrftask.py:415
def deliver_products(self, atime=None, kwargs)
Delivers output products to the intercom directory.
Definition: bufrprep.py:361
def tdrdump
Dump TDR data for operational run.
Definition: bufrprep.py:200
def run(self)
Runs and delivers the results.
Definition: bufrprep.py:283
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
Definition: run.py:398
def prep_prepbufr(self, option)
pre-process prepbufr data
Definition: bufrprep.py:235
def grab_obstype_section
Copies or links observations specified in the obstype sections to the current working directory...
Definition: bufrprep.py:100
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
def grab_prepbufr(self, atime=None, kwargs)
Links or copies the prepbufr file to the local directory.
Definition: bufrprep.py:175
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
def __init__(self, dstore, conf, section, taskname=None, atime=None, kwargs)
Bufrprep constructor.
Definition: bufrprep.py:27
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Definition: fileop.py:333
def grab_bufr
Links or copies all needed bufr files to the local directory.
Definition: bufrprep.py:87
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
def makedirs
Make a directory tree, working around filesystem bugs.
Definition: fileop.py:224
Time manipulation and other numerical routines.
Definition: numerics.py:1
Provides information about the cluster on which this job is running.
Definition: cluster.py:1
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def confint
Alias for self.conf.getint for section self.section.
Definition: hwrftask.py:248
def conffloat
Alias for self.conf.getfloat for section self.section.
Definition: hwrftask.py:274
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
Definition: hwrftask.py:195
def log
Obtain a logging domain.
Definition: hwrftask.py:425
Provides the location of a file in an archive, on disk or on a remote server via sftp or ftp...
Definition: input.py:109
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
Definition: run.py:242
def name()
Synonym for here.name.
Definition: cluster.py:109
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
def write_vitals
Writes the tcvitals (from self.storminfo) to the specified file.
Definition: bufrprep.py:272
def readtdrtime(self)
Runs the hwrf_readtdrtime program.
Definition: bufrprep.py:330
Raised when GSI cannot find a required input file.
Definition: exceptions.py:147
def realtime(self)
Is this job a real-time forecast job?
Definition: hwrftask.py:180
def icstr(self, string, section=None, kwargs)
Expands a string in the given conf section.
Definition: hwrftask.py:351
This is a HWRF task that preprocesses observations in data tanks to create bufr files suitable for in...
Definition: bufrprep.py:21