HWRF  trunk@4391
exhwrf_output.py
1 #! /usr/bin/env python
2 
3 ##@namespace scripts.exhwrf_output
4 # @brief Delivers data to the COM directory or elsewhere.
5 #
6 # This script implements data delivery functionality specific to the
7 # NCEP HWRF system. It is run after everything except archiving, and it
8 # copies the output of some prior jobs to the COM directory under
9 # different names. In the operational HWRF, this job is also supposed
10 # to email the NOAA Senior Duty Meteorologist the HWRF AFOS file.
11 # However, that functionality is intentionally left out at the moment.
12 #
13 # This job is written specifically for the 2013, 2014 and 2015
14 # operational HWRF systems. It does recognize the run_gsi, run_ocean
15 # and run_relocation options, but it will need to be modified to handle
16 # other configurations. However, this job is optional. If you do not
17 # run this job, the later archiving and graphics jobs will still run
18 # correctly.
19 
20 import os, glob, sys, logging, math
22 
24 from produtil.log import jlogger
26 from hwrf.ensda import read_ensda_flag_file
27 import hwrf_alerts
28 
29 ##The copier is a function that returns either None, or a callable
30 #object suitable for passing into the deliver_file to compress while
31 #copying. It is actually just a pointer to hwrf_expt.wrfcopier.copier,
32 #created in the "main" function after initializing the hwrf_expt
33 #module.
34 copier=None
35 
36 def deliver_multi(sourceglob,target,logger):
37  """!Delivers several files to a target directory, logging messages as it goes
38  @param sourceglob the glob (from glob.glob()) that lists the input files
39  @param target the target directory
40  @param logger a logging.Logger for messages
41  @returns the number of files delivered"""
42  count=0
43  if sourceglob.find('*')>=0 or sourceglob.find('?')>=0:
44  logger.info('Globbing for: %s'%(sourceglob,))
45  for source in glob.glob(sourceglob):
47  source,target,keep=True,
48  logger=logger,copier=copier(source))
49  count+=1
50  else:
52  sourceglob,target,keep=True,
53  logger=logger,copier=copier(sourceglob))
54  count=1
55  return count
56 
57 class Deliverer(object):
58  """!A utility class for delivering files. This implements most of
59  the functionality of the exhwrf_output."""
60  def __init__(self,logger,conf):
61  """!Creates a new Deliverer with the specified logging.Logger
62  and HWRFConfig.
63  @param logger a logging.Logger for log messages
64  @param conf an hwrf.config.HWRFConfig for configuration info"""
65  self.__logger=logger
66  self.__conf=conf
67  self.__failures=0 # Count of the number of failed deliveries
68  self.__morevars=dict() # dict of string format substitutions
69  def log(self,sublog=None):
70  """!Gets the logging.Logger for this Deliverer.
71  @param sublog If "sublog" is provided, a new logging.Logger is
72  created for that subdomain of this Deliverer's logging domain."""
73  if sublog is not None:
74  return logging.getLogger(self.__logger.name+'.'+sublog)
75  return self.__logger
76  def __setitem__(self,key,value):
77  """!Sets a key in an internal dict of values used for string
78  formatting.
79  @param key the string formatting key
80  @param value what to replace the key with in string replacement"""
81  self.__morevars[key]=value
82  def __getitem__(self,key):
83  """!Gets a key from the hash of internal values used for string
84  formatting, or raises KeyError if no such key is found.
85  @param key the desired key"""
86  return self.__morevars[key]
87  @property
88  def failures(self):
89  """!The number of failed deliveries."""
90  return self.__failures
91  def reset(self):
92  """!Resets the number of failures to zero."""
93  self.__failures=0
94  @property
95  def conf(self):
96  """!The hwrf.config.HWRFConfig object for this Deliverer."""
97  return self.__conf
98  def deliver_file(self,workfile,comfile=None,from_com=False,
99  optional=False,**kwargs):
100  """!Delivers one file.
101 
102  Delivers the specified work area file to the specified com
103  location. OR, if from_com=True, it does the opposite: deliver
104  from com to the workfile. If the com location is not
105  specified, a suitable default will be chosen. If
106  optional=True, deliver_file will ignore the file if it is
107  missing. Otherwise, a missing file counts as a failure in the
108  internal failure counter. Additional keyword arguments are
109  sent to conf.strinterp. If the workfile is a relative path,
110  it is relative to the WORKhwrf directory.
111 
112  When delivering to COM (from_com=False, the default), the
113  workfile can instead be a Product, in which case the
114  Product.location and .available are checked for availability
115  information. When delivering from COM (from_com=True), the
116  workfile must be a string path.
117  @param workfile the file in the work area
118  @param comfile the file in the COM directory
119  @param from_com if True, deliver from com to work, if False, do the opposite
120  @param optional if True, and the file is missing, return True
121  @param kwargs additional keyword arguments passed on to produtil.fileop.deliver_file()
122  @returns True on successful delivery, False on delivery
123  failure. If the file could not be delivered, but
124  optional=True, then True is returned."""
125  conf=self.conf
126  logger=self.log()
127  if kwargs:
128  morevars=dict(self.__morevars)
129  morevars.update(kwargs)
130  else:
131  morevars=self.__morevars
132  if not from_com and isinstance(workfile,produtil.datastore.Product):
133  with workfile.dstore.transaction() as t:
134  loc=workfile.location
135  av=workfile.available
136  if loc is None or loc=='':
137  if not optional:
138  logger.error('%s: no location'%(workfile.did,))
139  self.__failures+=1
140  return False
141  else:
142  logger.warning('%s: no location'%(workfile.did,))
143  return
144  elif not av:
145  if not optional:
146  logger.error('%s: not available (location=%s)'%(
147  workfile.did,workfile.location))
148  self.__failures+=1
149  return False
150  else:
151  logger.warning('%s: not available (location=%s)'%(
152  workfile.did,workfile.location))
153  return
154  else:
155  workpath=workfile.location
156  elif from_com:
157  assert(isinstance(workfile,basestring))
158  workpath=conf.strinterp('config',workfile,**morevars)
159  logger.info('from_com workpath='+workpath)
160  else:
161  assert(isinstance(workfile,basestring))
162  workpath=os.path.join(
163  conf.getdir('WORKhwrf'),
164  conf.strinterp('config',workfile,**morevars) )
165 
166  if comfile is None:
167  if from_com:
168  raise AssertionError('When copying from com, the comfile '
169  'must be specified.')
170  compath=os.path.join(
171  conf.getdir('com'),
172  conf.strinterp('config','{out_prefix}.{workbasename}',
173  workbasename=os.path.basename(workpath)))
174  else:
175  compath=os.path.join(
176  conf.getdir('com'),
177  conf.strinterp('config',comfile,**morevars) )
178  logger.info('compath is '+compath)
179  if from_com and optional and not os.path.exists(compath):
180  logger.info('Optional file does not exist: '+repr(compath))
181  return
182  elif not from_com and optional and not os.path.exists(workpath):
183  logger.info('Optional file does not exist: '+repr(workpath))
184  return
185  try:
186  if from_com:
187  frompath=compath
188  topath=workpath
189  else:
190  frompath=workpath
191  topath=compath
192  logger.info('deliver %s => %s'%(frompath,topath))
193  count=deliver_multi(frompath,topath,logger=logger)
194  except EnvironmentError as e:
195  if optional:
196  logger.warning('%s: cannot deliver: %s'%(workpath,str(e)))
197  else:
198  logger.error('%s: cannot deliver: %s'%(workpath,str(e)),
199  exc_info=True)
200  self.__failures+=1
201  return False
202  if count==0:
203  if optional:
204  logger.warning('%s: no files matched this glob. This '
205  'file was optional, so I will continue.'
206  %(frompath,))
207  return False
208  else:
209  logger.error('%s: no files matched this glob.'%(frompath,))
210  self.__failures+=1
211  return False
212  return True
213 
214 ########################################################################
215 
216 def main():
217  """!Delivers HWRF files to COM"""
218  import hwrf_expt
219  hwrf_expt.init_module(make_ensemble_da=True)
220  # Make sure DBN alerts and other such things are triggered:
224 
225  global copier
226  copier=hwrf_expt.wrfcopier.compression_copier
227 
228  if 'NO' == os.environ.get('PARAFLAG','YES'):
229  jlogger.info('Calling email_afos_to_sdm from output job to email the track.')
230  afos=hwrf_expt.nhcp.product('afos')
232  jlogger.info('Done with email_afos_to_sdm. Will now celebrate by delivering many things to COM.')
233 
234  conf=hwrf_expt.conf
235  relocation=conf.getbool('config','run_relocation',True)
236  ocean=conf.getstr('config','ocean_model')
237  coupled=conf.getbool('config','run_ocean',True)
238  GSI=conf.getbool('config','run_gsi')
239  run_ensemble_da=conf.getbool('config','run_ensemble_da',False)
240  extra_trackers=conf.getbool('config','extra_trackers',False)
241  fcstlen=conf.getint('config','forecast_length',126)
242 
243  ocstatus=hwrf_expt.ocstatus
244  logger=conf.log('output')
245 
246  if coupled and not ocstatus.get(logger):
247  coupled=False
248 
249  hwrf_expt.wrfcopier.run()
250 
251  D=Deliverer(logger,conf)
252  D['wrfdir']=hwrf_expt.runwrf.workdir
253  D.deliver_file('{WORKhwrf}/tmpvit','{out_prefix}.storm_vit')
254 
255  if GSI:
256  D['gsi_d02']=hwrf_expt.gsi_d02.outdir
257  if hwrf_expt.gsid03_flag:
258  D['gsi_d03']=hwrf_expt.gsi_d03.outdir
259 
260  # Multistorm - jtf
261  logger.info('WRF run directory is %s'%(repr(D['wrfdir']),))
262 
263  D.deliver_file('{WORKhwrf}/jlogfile',optional=True)
264 
265  d01=hwrf_expt.moad
266  d02=hwrf_expt.storm1outer
267  d03=hwrf_expt.storm1inner
268 
269  if coupled and ocean=='POM':
270  D.deliver_file('{wrfdir}/MDstatus',optional=True)
271  for ocrest in ('el_initial.nc', 'grid.nc', 'ts_clim.nc',
272  'ts_initial.nc', 'uv_initial.nc' ):
273  D.deliver_file('{wrfdir}/{vit[stormname]}.{ocrest}',
274  '{out_prefix}.pom.{ocrest}', ocrest=ocrest)
275 
276  for iday in xrange(int(math.floor(fcstlen/24.0+0.01))):
277  ocrest="%04d.nc"%iday
278  D.deliver_file('{wrfdir}/{vit[stormname]}.{ocrest}',
279  '{out_prefix}.pom.{ocrest}', ocrest=ocrest)
280 
281  logcount=0
282  for ext in ('log','out','err'):
283  globme=conf.getdir('WORKhwrf')+'/*.'+ext
284  logger.info('Globbing for %s log files'%(globme,))
285  for log in glob.glob(globme):
286  logcount+=1
287  D.deliver_file(log)
288  logger.info('Found %d log file(s)'%(logcount,))
289 
290  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
291  # Deliver GSI stuff next.
292  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
293  gsi_flag=conf.getbool('config','run_gsi')
294  gsiop=True
295  if gsi_flag:
296  gsiop=((not hwrf_expt.gsi_d02.completed) or
297  (not hwrf_expt.gsi_d03.completed))
298  if gsiop:
299  logger.warning('GSI failed, so all GSI products are optional.')
300  else:
301  logger.info('GSI ran, so its products are mandatory.')
302  if GSI:
303  # Copy the original wrfinput file before DA:
304  org_d01=hwrf_expt.gfs_init.realinit.wrfinput_at_time(
305  hwrf_expt.cycle,d01)
306  D.deliver_file(org_d01,'{out_prefix}.wrforg_d01',optional=gsiop)
307 
308  if GSI:
309  # Get the FGAT initialization at the analysis time:
310  ceninit=hwrf_expt.fgat_init.init_at_time(hwrf_expt.conf.cycle)
311 
312  # Copy the original wrfanl files before relocation:
313  org_d02=ceninit.runwrfanl.wrfanl_at_time(hwrf_expt.conf.cycle,d02)
314  org_d03=ceninit.runwrfanl.wrfanl_at_time(hwrf_expt.conf.cycle,d03)
315  D.deliver_file(org_d02,'{out_prefix}.wrforg_d02',optional=gsiop)
316  D.deliver_file(org_d03,'{out_prefix}.wrforg_d03',optional=gsiop)
317 
318  if relocation:
319  # Copy the wrfanl files after relocation, but before GSI:
320  ges_d02=ceninit.rstage3.wrfanl_at_time(hwrf_expt.conf.cycle,d02)
321  ges_d03=ceninit.rstage3.wrfanl_at_time(hwrf_expt.conf.cycle,d03)
322  D.deliver_file(ges_d02,'{out_prefix}.wrfges_d02',optional=gsiop)
323  D.deliver_file(ges_d03,'{out_prefix}.wrfges_d03',optional=gsiop)
324 
325 
326  if GSI:
327  D.deliver_file('{gsi_d02}/satbias_out',
328  '{out_prefix}.gsi_cvs2.biascr',optional=gsiop)
329  if hwrf_expt.gsid03_flag:
330  D.deliver_file('{gsi_d03}/satbias_out',
331  '{out_prefix}.gsi_cvs3.biascr',optional=gsiop)
332 
333  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
334  # Lastly, deliver the diag files
335  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
336  jlogger.info('Delivering wrfdiag files to com.')
337  hwrf_expt.nhcp.deliver_wrfdiag()
338 
339  if D.failures>0:
340  jlogger.critical('HWRF: unable to deliver %d non-optional products to com.'%int(D.failures))
341  sys.exit(1)
342 
343  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
344  # Deliver things to noscrub for non-NCO runs # # # # # # # # # # #
345  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
346  if conf.getbool('config','PARAFLAG'):
347  logger.info('You are not NCO, so I will deliver files to noscrub.')
348  else:
349  logger.info('You are NCO so I will skip NOSCRUB deliveries.')
350 
351  D.reset()
352  def fromcom(workpath,compath,optional=False):
353  D.deliver_file(workpath,compath,from_com=True,optional=optional)
354 
355  def havedir(sdir):
356  there=conf.get('dir',sdir,'NOPE')
357  if there=='NOPE':
358  return False
360  return True
361 
362  if havedir('outatcf'):
363  fromcom('{outatcf}','{out_prefix}.trak.hwrf.atcfunix')
364  if havedir('outdiag'):
365  fromcom('{outdiag}','{out_prefix}.trak.hwrf.3hourly*')
366  fromcom('{outdiag}','{out_prefix}*resolution',True)
367  fromcom('{outdiag}','{out_prefix}*htcf*stats',True)
368  fromcom('{outdiag}','{out_prefix}*htcf',True)
369  fromcom('{outdiag}','a*.dat')
370  fromcom('{outdiag}','{out_prefix}.stats.tpc',optional=True)
371  if extra_trackers:
372  fromcom('{outdiag}','{com}/{out_prefix}.trak.hwrfd01.atcfunix')
373  fromcom('{outdiag}','{com}/{out_prefix}.trak.hwrfd02.atcfunix')
374  if havedir('outships'):
375  fromcom('{outships}','figures/*.txt',optional=True)
376  if havedir('outstatus'):
377  fromcom('{outstatus}','{WORKhwrf}/submit.out',optional=True)
378  timings=conf.strinterp('config','{outstatus}/{out_prefix}.timings')
379  inout=conf.strinterp('config','{WORKhwrf}/hwrf_*.out')
380  with open(timings,'wt') as outf:
381  for inoutfile in glob.glob(inout):
382  if not os.path.exists(inoutfile):
383  logger.warning('%s: file does not exist; skipping'
384  %(inoutfile,))
385  with open(inoutfile,'rt') as inf:
386  for line in inf:
387  if line.find('TIMING')>=0:
388  print>>outf,line.rstrip()
389  if havedir('outatcfcorrected'):
390  inatcf=conf.strinterp('config',
391  '{com}/{out_prefix}.trak.hwrf.atcfunix')
392  outatcf=conf.strinterp(
393  'config','{outatcfcorrected}/{out_prefix}.trak.hwrf.atcfunix')
394  hwrf.tracker.jtwc_rewrite(inatcf,outatcf,logger)
395 
396  ####################################################################
397 
398  # Create the "done file" if ensda is entirely disabled. This is
399  # used by the workflow layer to know when the cycle is entirely
400  # complete, and can be deleted.
401 
402  # NOTE FOR FUTURE DEVELOPMENT: When the graphics are added to the
403  # workflow, we will need to move the creation of this "done file"
404  # to a later step, after the graphics. The logical candidate
405  # would be a new job whose purpose is to check the cycle's entire
406  # workflow to make sure it is finished.
407  make_done=True
408  if run_ensemble_da:
409  flag_file=conf.strinterp('tdrcheck','{tdr_flag_file}')
410  try:
411  ensda_flag=hwrf.ensda.read_ensda_flag_file(flag_file)
412  except (EnvironmentError) as e:
413  logger.error('%s: unable to get ensda_flag; assume False: %s'%(
414  flag_file,str(e)),exc_info=True)
415  ensda_flag=False
416 
417 ########################################################################
418 
419 if __name__=='__main__':
420  try:
422  jlogger.info('hwrf_output is starting')
423  main()
424  jlogger.info('hwrf_output has completed')
425  except Exception as e:
426  jlogger.critical('hwrf_output is aborting: '+str(e),exc_info=True)
427  sys.exit(2)
Runs the POM initialization and POM-WRF coupled forecast.
Definition: mpipomtc.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
Definition: fileop.py:359
def deliver_file(self, workfile, comfile=None, from_com=False, optional=False, kwargs)
Delivers one file.
def jtwc_rewrite
Rewrites track files as used by the HWRF WPAC parallels from 2013-2015.
Definition: tracker.py:26
A utility class for delivering files.
Contains setup(), which initializes the produtil package.
Definition: setup.py:1
def __init__(self, logger, conf)
Creates a new Deliverer with the specified logging.Logger and HWRFConfig.
def failures(self)
The number of failed deliveries.
def init_module
Initializes the HWRF object structure.
Definition: hwrf_expt.py:384
def __getitem__(self, key)
Gets a key from the hash of internal values used for string formatting, or raises KeyError if no such...
def email_afos_to_sdm(afos, args, kwargs)
Emails the AFOS file to the NOAA Senior Duty Meterologist (SDM)
Definition: hwrf_alerts.py:171
A piece of data produced by a Task.
Definition: datastore.py:716
def setup(ignore_hup=False, dbnalert_logger=None, jobname=None, cluster=None, send_dbn=None, thread_logger=False, thread_stack=2 **24, kwargs)
Initializes the produtil package.
Definition: setup.py:15
Utilities for ensemble-based data assimilation.
Definition: ensda.py:1
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
def makedirs
Make a directory tree, working around filesystem bugs.
Definition: fileop.py:224
copier
The copier is a function that returns either None, or a callable object suitable for passing into the...
def add_wave_alerts()
DBN ALERTS #########################################################.
Definition: hwrf_alerts.py:99
def add_regrib_alerts()
Adds dbn alerts for GRIB products by adding DBNAlert objects to the hwrf_expt.gribber.
Definition: hwrf_alerts.py:123
def add_nhc_alerts()
Adds dbn alerts for the hwrf_nhc_products program's output by adding DBNAlert objects to the hwrf_exp...
Definition: hwrf_alerts.py:251
Configures logging.
Definition: log.py:1
def conf(self)
The hwrf.config.HWRFConfig object for this Deliverer.
def deliver_multi(sourceglob, target, logger)
Delivers several files to a target directory, logging messages as it goes.
def log
Gets the logging.Logger for this Deliverer.
This module contains tasks to prepare input for the GFDL Vortex Tracker, run the tracker and deliver ...
Definition: tracker.py:1
def read_ensda_flag_file(flag_file)
Reads the stormX.run_ensda flag file.
Definition: ensda.py:625
def main()
Delivers HWRF files to COM.
def reset(self)
Resets the number of failures to zero.
def __setitem__(self, key, value)
Sets a key in an internal dict of values used for string formatting.