HWRF  trunk@4391
1 #! /usr/bin/env python
3 ##@namespace scripts.exhwrf_output
4 # @brief Delivers data to the COM directory or elsewhere.
5 #
6 # This script implements data delivery functionality specific to the
7 # NCEP HWRF system. It is run after everything except archiving, and it
8 # copies the output of some prior jobs to the COM directory under
9 # different names. In the operational HWRF, this job is also supposed
10 # to email the NOAA Senior Duty Meteorologist the HWRF AFOS file.
11 # However, that functionality is intentionally left out at the moment.
12 #
13 # This job is written specifically for the 2013, 2014 and 2015
14 # operational HWRF systems. It does recognize the run_gsi, run_ocean
15 # and run_relocation options, but it will need to be modified to handle
16 # other configurations. However, this job is optional. If you do not
17 # run this job, the later archiving and graphics jobs will still run
18 # correctly.
20 import os, glob, sys, logging, math
24 from produtil.log import jlogger
26 from hwrf.ensda import read_ensda_flag_file
27 import hwrf_alerts
29 ##The copier is a function that returns either None, or a callable
30 #object suitable for passing into the deliver_file to compress while
31 #copying. It is actually just a pointer to hwrf_expt.wrfcopier.copier,
32 #created in the "main" function after initializing the hwrf_expt
33 #module.
34 copier=None
36 def deliver_multi(sourceglob,target,logger):
37  """!Delivers several files to a target directory, logging messages as it goes
38  @param sourceglob the glob (from glob.glob()) that lists the input files
39  @param target the target directory
40  @param logger a logging.Logger for messages
41  @returns the number of files delivered"""
42  count=0
43  if sourceglob.find('*')>=0 or sourceglob.find('?')>=0:
44  logger.info('Globbing for: %s'%(sourceglob,))
45  for source in glob.glob(sourceglob):
47  source,target,keep=True,
48  logger=logger,copier=copier(source))
49  count+=1
50  else:
52  sourceglob,target,keep=True,
53  logger=logger,copier=copier(sourceglob))
54  count=1
55  return count
57 class Deliverer(object):
58  """!A utility class for delivering files. This implements most of
59  the functionality of the exhwrf_output."""
60  def __init__(self,logger,conf):
61  """!Creates a new Deliverer with the specified logging.Logger
62  and HWRFConfig.
63  @param logger a logging.Logger for log messages
64  @param conf an hwrf.config.HWRFConfig for configuration info"""
65  self.__logger=logger
66  self.__conf=conf
67  self.__failures=0 # Count of the number of failed deliveries
68  self.__morevars=dict() # dict of string format substitutions
69  def log(self,sublog=None):
70  """!Gets the logging.Logger for this Deliverer.
71  @param sublog If "sublog" is provided, a new logging.Logger is
72  created for that subdomain of this Deliverer's logging domain."""
73  if sublog is not None:
74  return logging.getLogger(self.__logger.name+'.'+sublog)
75  return self.__logger
76  def __setitem__(self,key,value):
77  """!Sets a key in an internal dict of values used for string
78  formatting.
79  @param key the string formatting key
80  @param value what to replace the key with in string replacement"""
81  self.__morevars[key]=value
82  def __getitem__(self,key):
83  """!Gets a key from the hash of internal values used for string
84  formatting, or raises KeyError if no such key is found.
85  @param key the desired key"""
86  return self.__morevars[key]
87  @property
88  def failures(self):
89  """!The number of failed deliveries."""
90  return self.__failures
91  def reset(self):
92  """!Resets the number of failures to zero."""
93  self.__failures=0
94  @property
95  def conf(self):
96  """!The hwrf.config.HWRFConfig object for this Deliverer."""
97  return self.__conf
98  def deliver_file(self,workfile,comfile=None,from_com=False,
99  optional=False,**kwargs):
100  """!Delivers one file.
102  Delivers the specified work area file to the specified com
103  location. OR, if from_com=True, it does the opposite: deliver
104  from com to the workfile. If the com location is not
105  specified, a suitable default will be chosen. If
106  optional=True, deliver_file will ignore the file if it is
107  missing. Otherwise, a missing file counts as a failure in the
108  internal failure counter. Additional keyword arguments are
109  sent to conf.strinterp. If the workfile is a relative path,
110  it is relative to the WORKhwrf directory.
112  When delivering to COM (from_com=False, the default), the
113  workfile can instead be a Product, in which case the
114  Product.location and .available are checked for availability
115  information. When delivering from COM (from_com=True), the
116  workfile must be a string path.
117  @param workfile the file in the work area
118  @param comfile the file in the COM directory
119  @param from_com if True, deliver from com to work, if False, do the opposite
120  @param optional if True, and the file is missing, return True
121  @param kwargs additional keyword arguments passed on to produtil.fileop.deliver_file()
122  @returns True on successful delivery, False on delivery
123  failure. If the file could not be delivered, but
124  optional=True, then True is returned."""
125  conf=self.conf
126  logger=self.log()
127  if kwargs:
128  morevars=dict(self.__morevars)
129  morevars.update(kwargs)
130  else:
131  morevars=self.__morevars
132  if not from_com and isinstance(workfile,produtil.datastore.Product):
133  with workfile.dstore.transaction() as t:
134  loc=workfile.location
135  av=workfile.available
136  if loc is None or loc=='':
137  if not optional:
138  logger.error('%s: no location'%(workfile.did,))
139  self.__failures+=1
140  return False
141  else:
142  logger.warning('%s: no location'%(workfile.did,))
143  return
144  elif not av:
145  if not optional:
146  logger.error('%s: not available (location=%s)'%(
147  workfile.did,workfile.location))
148  self.__failures+=1
149  return False
150  else:
151  logger.warning('%s: not available (location=%s)'%(
152  workfile.did,workfile.location))
153  return
154  else:
155  workpath=workfile.location
156  elif from_com:
157  assert(isinstance(workfile,basestring))
158  workpath=conf.strinterp('config',workfile,**morevars)
159  logger.info('from_com workpath='+workpath)
160  else:
161  assert(isinstance(workfile,basestring))
162  workpath=os.path.join(
163  conf.getdir('WORKhwrf'),
164  conf.strinterp('config',workfile,**morevars) )
166  if comfile is None:
167  if from_com:
168  raise AssertionError('When copying from com, the comfile '
169  'must be specified.')
170  compath=os.path.join(
171  conf.getdir('com'),
172  conf.strinterp('config','{out_prefix}.{workbasename}',
173  workbasename=os.path.basename(workpath)))
174  else:
175  compath=os.path.join(
176  conf.getdir('com'),
177  conf.strinterp('config',comfile,**morevars) )
178  logger.info('compath is '+compath)
179  if from_com and optional and not os.path.exists(compath):
180  logger.info('Optional file does not exist: '+repr(compath))
181  return
182  elif not from_com and optional and not os.path.exists(workpath):
183  logger.info('Optional file does not exist: '+repr(workpath))
184  return
185  try:
186  if from_com:
187  frompath=compath
188  topath=workpath
189  else:
190  frompath=workpath
191  topath=compath
192  logger.info('deliver %s => %s'%(frompath,topath))
193  count=deliver_multi(frompath,topath,logger=logger)
194  except EnvironmentError as e:
195  if optional:
196  logger.warning('%s: cannot deliver: %s'%(workpath,str(e)))
197  else:
198  logger.error('%s: cannot deliver: %s'%(workpath,str(e)),
199  exc_info=True)
200  self.__failures+=1
201  return False
202  if count==0:
203  if optional:
204  logger.warning('%s: no files matched this glob. This '
205  'file was optional, so I will continue.'
206  %(frompath,))
207  return False
208  else:
209  logger.error('%s: no files matched this glob.'%(frompath,))
210  self.__failures+=1
211  return False
212  return True
214 ########################################################################
216 def main():
217  """!Delivers HWRF files to COM"""
218  import hwrf_expt
219  hwrf_expt.init_module(make_ensemble_da=True)
220  # Make sure DBN alerts and other such things are triggered:
225  global copier
226  copier=hwrf_expt.wrfcopier.compression_copier
228  if 'NO' == os.environ.get('PARAFLAG','YES'):
229  jlogger.info('Calling email_afos_to_sdm from output job to email the track.')
230  afos=hwrf_expt.nhcp.product('afos')
232  jlogger.info('Done with email_afos_to_sdm. Will now celebrate by delivering many things to COM.')
234  conf=hwrf_expt.conf
235  relocation=conf.getbool('config','run_relocation',True)
236  ocean=conf.getstr('config','ocean_model')
237  coupled=conf.getbool('config','run_ocean',True)
238  GSI=conf.getbool('config','run_gsi')
239  run_ensemble_da=conf.getbool('config','run_ensemble_da',False)
240  extra_trackers=conf.getbool('config','extra_trackers',False)
241  fcstlen=conf.getint('config','forecast_length',126)
243  ocstatus=hwrf_expt.ocstatus
244  logger=conf.log('output')
246  if coupled and not ocstatus.get(logger):
247  coupled=False
249  hwrf_expt.wrfcopier.run()
251  D=Deliverer(logger,conf)
252  D['wrfdir']=hwrf_expt.runwrf.workdir
253  D.deliver_file('{WORKhwrf}/tmpvit','{out_prefix}.storm_vit')
255  if GSI:
256  D['gsi_d02']=hwrf_expt.gsi_d02.outdir
257  if hwrf_expt.gsid03_flag:
258  D['gsi_d03']=hwrf_expt.gsi_d03.outdir
260  # Multistorm - jtf
261  logger.info('WRF run directory is %s'%(repr(D['wrfdir']),))
263  D.deliver_file('{WORKhwrf}/jlogfile',optional=True)
265  d01=hwrf_expt.moad
266  d02=hwrf_expt.storm1outer
267  d03=hwrf_expt.storm1inner
269  if coupled and ocean=='POM':
270  D.deliver_file('{wrfdir}/MDstatus',optional=True)
271  for ocrest in ('el_initial.nc', 'grid.nc', 'ts_clim.nc',
272  'ts_initial.nc', 'uv_initial.nc' ):
273  D.deliver_file('{wrfdir}/{vit[stormname]}.{ocrest}',
274  '{out_prefix}.pom.{ocrest}', ocrest=ocrest)
276  for iday in xrange(int(math.floor(fcstlen/24.0+0.01))):
277  ocrest="%04d.nc"%iday
278  D.deliver_file('{wrfdir}/{vit[stormname]}.{ocrest}',
279  '{out_prefix}.pom.{ocrest}', ocrest=ocrest)
281  logcount=0
282  for ext in ('log','out','err'):
283  globme=conf.getdir('WORKhwrf')+'/*.'+ext
284  logger.info('Globbing for %s log files'%(globme,))
285  for log in glob.glob(globme):
286  logcount+=1
287  D.deliver_file(log)
288  logger.info('Found %d log file(s)'%(logcount,))
290  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
291  # Deliver GSI stuff next.
292  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
293  gsi_flag=conf.getbool('config','run_gsi')
294  gsiop=True
295  if gsi_flag:
296  gsiop=((not hwrf_expt.gsi_d02.completed) or
297  (not hwrf_expt.gsi_d03.completed))
298  if gsiop:
299  logger.warning('GSI failed, so all GSI products are optional.')
300  else:
301  logger.info('GSI ran, so its products are mandatory.')
302  if GSI:
303  # Copy the original wrfinput file before DA:
304  org_d01=hwrf_expt.gfs_init.realinit.wrfinput_at_time(
305  hwrf_expt.cycle,d01)
306  D.deliver_file(org_d01,'{out_prefix}.wrforg_d01',optional=gsiop)
308  if GSI:
309  # Get the FGAT initialization at the analysis time:
310  ceninit=hwrf_expt.fgat_init.init_at_time(hwrf_expt.conf.cycle)
312  # Copy the original wrfanl files before relocation:
313  org_d02=ceninit.runwrfanl.wrfanl_at_time(hwrf_expt.conf.cycle,d02)
314  org_d03=ceninit.runwrfanl.wrfanl_at_time(hwrf_expt.conf.cycle,d03)
315  D.deliver_file(org_d02,'{out_prefix}.wrforg_d02',optional=gsiop)
316  D.deliver_file(org_d03,'{out_prefix}.wrforg_d03',optional=gsiop)
318  if relocation:
319  # Copy the wrfanl files after relocation, but before GSI:
320  ges_d02=ceninit.rstage3.wrfanl_at_time(hwrf_expt.conf.cycle,d02)
321  ges_d03=ceninit.rstage3.wrfanl_at_time(hwrf_expt.conf.cycle,d03)
322  D.deliver_file(ges_d02,'{out_prefix}.wrfges_d02',optional=gsiop)
323  D.deliver_file(ges_d03,'{out_prefix}.wrfges_d03',optional=gsiop)
326  if GSI:
327  D.deliver_file('{gsi_d02}/satbias_out',
328  '{out_prefix}.gsi_cvs2.biascr',optional=gsiop)
329  if hwrf_expt.gsid03_flag:
330  D.deliver_file('{gsi_d03}/satbias_out',
331  '{out_prefix}.gsi_cvs3.biascr',optional=gsiop)
333  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
334  # Lastly, deliver the diag files
335  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
336  jlogger.info('Delivering wrfdiag files to com.')
337  hwrf_expt.nhcp.deliver_wrfdiag()
339  if D.failures>0:
340  jlogger.critical('HWRF: unable to deliver %d non-optional products to com.'%int(D.failures))
341  sys.exit(1)
343  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
344  # Deliver things to noscrub for non-NCO runs # # # # # # # # # # #
345  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
346  if conf.getbool('config','PARAFLAG'):
347  logger.info('You are not NCO, so I will deliver files to noscrub.')
348  else:
349  logger.info('You are NCO so I will skip NOSCRUB deliveries.')
351  D.reset()
352  def fromcom(workpath,compath,optional=False):
353  D.deliver_file(workpath,compath,from_com=True,optional=optional)
355  def havedir(sdir):
356  there=conf.get('dir',sdir,'NOPE')
357  if there=='NOPE':
358  return False
360  return True
362  if havedir('outatcf'):
363  fromcom('{outatcf}','{out_prefix}.trak.hwrf.atcfunix')
364  if havedir('outdiag'):
365  fromcom('{outdiag}','{out_prefix}.trak.hwrf.3hourly*')
366  fromcom('{outdiag}','{out_prefix}*resolution',True)
367  fromcom('{outdiag}','{out_prefix}*htcf*stats',True)
368  fromcom('{outdiag}','{out_prefix}*htcf',True)
369  fromcom('{outdiag}','a*.dat')
370  fromcom('{outdiag}','{out_prefix}.stats.tpc',optional=True)
371  if extra_trackers:
372  fromcom('{outdiag}','{com}/{out_prefix}.trak.hwrfd01.atcfunix')
373  fromcom('{outdiag}','{com}/{out_prefix}.trak.hwrfd02.atcfunix')
374  if havedir('outships'):
375  fromcom('{outships}','figures/*.txt',optional=True)
376  if havedir('outstatus'):
377  fromcom('{outstatus}','{WORKhwrf}/submit.out',optional=True)
378  timings=conf.strinterp('config','{outstatus}/{out_prefix}.timings')
379  inout=conf.strinterp('config','{WORKhwrf}/hwrf_*.out')
380  with open(timings,'wt') as outf:
381  for inoutfile in glob.glob(inout):
382  if not os.path.exists(inoutfile):
383  logger.warning('%s: file does not exist; skipping'
384  %(inoutfile,))
385  with open(inoutfile,'rt') as inf:
386  for line in inf:
387  if line.find('TIMING')>=0:
388  print>>outf,line.rstrip()
389  if havedir('outatcfcorrected'):
390  inatcf=conf.strinterp('config',
391  '{com}/{out_prefix}.trak.hwrf.atcfunix')
392  outatcf=conf.strinterp(
393  'config','{outatcfcorrected}/{out_prefix}.trak.hwrf.atcfunix')
394  hwrf.tracker.jtwc_rewrite(inatcf,outatcf,logger)
396  ####################################################################
398  # Create the "done file" if ensda is entirely disabled. This is
399  # used by the workflow layer to know when the cycle is entirely
400  # complete, and can be deleted.
402  # NOTE FOR FUTURE DEVELOPMENT: When the graphics are added to the
403  # workflow, we will need to move the creation of this "done file"
404  # to a later step, after the graphics. The logical candidate
405  # would be a new job whose purpose is to check the cycle's entire
406  # workflow to make sure it is finished.
407  make_done=True
408  if run_ensemble_da:
409  flag_file=conf.strinterp('tdrcheck','{tdr_flag_file}')
410  try:
411  ensda_flag=hwrf.ensda.read_ensda_flag_file(flag_file)
412  except (EnvironmentError) as e:
413  logger.error('%s: unable to get ensda_flag; assume False: %s'%(
414  flag_file,str(e)),exc_info=True)
415  ensda_flag=False
417 ########################################################################
419 if __name__=='__main__':
420  try:
422  jlogger.info('hwrf_output is starting')
423  main()
424  jlogger.info('hwrf_output has completed')
425  except Exception as e:
426  jlogger.critical('hwrf_output is aborting: '+str(e),exc_info=True)
427  sys.exit(2)
