20 import os, glob, sys, logging, math
37 """!Delivers several files to a target directory, logging messages as it goes
38 @param sourceglob the glob (from glob.glob()) that lists the input files
39 @param target the target directory
40 @param logger a logging.Logger for messages
41 @returns the number of files delivered"""
43 if sourceglob.find(
'*')>=0
or sourceglob.find(
'?')>=0:
44 logger.info(
'Globbing for: %s'%(sourceglob,))
45 for source
in glob.glob(sourceglob):
47 source,target,keep=
True,
48 logger=logger,copier=
copier(source))
52 sourceglob,target,keep=
True,
53 logger=logger,copier=
copier(sourceglob))
58 """!A utility class for delivering files. This implements most of
59 the functionality of the exhwrf_output."""
61 """!Creates a new Deliverer with the specified logging.Logger
63 @param logger a logging.Logger for log messages
64 @param conf an hwrf.config.HWRFConfig for configuration info"""
69 def log(self,sublog=None):
70 """!Gets the logging.Logger for this Deliverer.
71 @param sublog If "sublog" is provided, a new logging.Logger is
72 created for that subdomain of this Deliverer's logging domain."""
73 if sublog
is not None:
74 return logging.getLogger(self.__logger.name+
'.'+sublog)
77 """!Sets a key in an internal dict of values used for string
79 @param key the string formatting key
80 @param value what to replace the key with in string replacement"""
83 """!Gets a key from the hash of internal values used for string
84 formatting, or raises KeyError if no such key is found.
85 @param key the desired key"""
89 """!The number of failed deliveries."""
92 """!Resets the number of failures to zero."""
96 """!The hwrf.config.HWRFConfig object for this Deliverer."""
98 def deliver_file(self,workfile,comfile=None,from_com=False,
99 optional=
False,**kwargs):
100 """!Delivers one file.
102 Delivers the specified work area file to the specified com
103 location. OR, if from_com=True, it does the opposite: deliver
104 from com to the workfile. If the com location is not
105 specified, a suitable default will be chosen. If
106 optional=True, deliver_file will ignore the file if it is
107 missing. Otherwise, a missing file counts as a failure in the
108 internal failure counter. Additional keyword arguments are
109 sent to conf.strinterp. If the workfile is a relative path,
110 it is relative to the WORKhwrf directory.
112 When delivering to COM (from_com=False, the default), the
113 workfile can instead be a Product, in which case the
114 Product.location and .available are checked for availability
115 information. When delivering from COM (from_com=True), the
116 workfile must be a string path.
117 @param workfile the file in the work area
118 @param comfile the file in the COM directory
119 @param from_com if True, deliver from com to work, if False, do the opposite
120 @param optional if True, and the file is missing, return True
121 @param kwargs additional keyword arguments passed on to produtil.fileop.deliver_file()
122 @returns True on successful delivery, False on delivery
123 failure. If the file could not be delivered, but
124 optional=True, then True is returned."""
129 morevars.update(kwargs)
133 with workfile.dstore.transaction()
as t:
134 loc=workfile.location
135 av=workfile.available
136 if loc
is None or loc==
'':
138 logger.error(
'%s: no location'%(workfile.did,))
142 logger.warning(
'%s: no location'%(workfile.did,))
146 logger.error(
'%s: not available (location=%s)'%(
147 workfile.did,workfile.location))
151 logger.warning(
'%s: not available (location=%s)'%(
152 workfile.did,workfile.location))
155 workpath=workfile.location
157 assert(isinstance(workfile,basestring))
158 workpath=conf.strinterp(
'config',workfile,**morevars)
159 logger.info(
'from_com workpath='+workpath)
161 assert(isinstance(workfile,basestring))
162 workpath=os.path.join(
163 conf.getdir(
'WORKhwrf'),
164 conf.strinterp(
'config',workfile,**morevars) )
168 raise AssertionError(
'When copying from com, the comfile '
169 'must be specified.')
170 compath=os.path.join(
172 conf.strinterp(
'config',
'{out_prefix}.{workbasename}',
173 workbasename=os.path.basename(workpath)))
175 compath=os.path.join(
177 conf.strinterp(
'config',comfile,**morevars) )
178 logger.info(
'compath is '+compath)
179 if from_com
and optional
and not os.path.exists(compath):
180 logger.info(
'Optional file does not exist: '+repr(compath))
182 elif not from_com
and optional
and not os.path.exists(workpath):
183 logger.info(
'Optional file does not exist: '+repr(workpath))
192 logger.info(
'deliver %s => %s'%(frompath,topath))
194 except EnvironmentError
as e:
196 logger.warning(
'%s: cannot deliver: %s'%(workpath,str(e)))
198 logger.error(
'%s: cannot deliver: %s'%(workpath,str(e)),
204 logger.warning(
'%s: no files matched this glob. This '
205 'file was optional, so I will continue.'
209 logger.error(
'%s: no files matched this glob.'%(frompath,))
217 """!Delivers HWRF files to COM"""
226 copier=hwrf_expt.wrfcopier.compression_copier
228 if 'NO' == os.environ.get(
'PARAFLAG',
'YES'):
229 jlogger.info(
'Calling email_afos_to_sdm from output job to email the track.')
230 afos=hwrf_expt.nhcp.product(
'afos')
232 jlogger.info(
'Done with email_afos_to_sdm. Will now celebrate by delivering many things to COM.')
235 relocation=conf.getbool(
'config',
'run_relocation',
True)
236 ocean=conf.getstr(
'config',
'ocean_model')
237 coupled=conf.getbool(
'config',
'run_ocean',
True)
238 GSI=conf.getbool(
'config',
'run_gsi')
239 run_ensemble_da=conf.getbool(
'config',
'run_ensemble_da',
False)
240 extra_trackers=conf.getbool(
'config',
'extra_trackers',
False)
241 fcstlen=conf.getint(
'config',
'forecast_length',126)
243 ocstatus=hwrf_expt.ocstatus
244 logger=conf.log(
'output')
246 if coupled
and not ocstatus.get(logger):
249 hwrf_expt.wrfcopier.run()
252 D[
'wrfdir']=hwrf_expt.runwrf.workdir
253 D.deliver_file(
'{WORKhwrf}/tmpvit',
'{out_prefix}.storm_vit')
256 D[
'gsi_d02']=hwrf_expt.gsi_d02.outdir
257 if hwrf_expt.gsid03_flag:
258 D[
'gsi_d03']=hwrf_expt.gsi_d03.outdir
261 logger.info(
'WRF run directory is %s'%(repr(D[
'wrfdir']),))
263 D.deliver_file(
'{WORKhwrf}/jlogfile',optional=
True)
266 d02=hwrf_expt.storm1outer
267 d03=hwrf_expt.storm1inner
269 if coupled
and ocean==
'POM':
270 D.deliver_file(
'{wrfdir}/MDstatus',optional=
True)
271 for ocrest
in (
'el_initial.nc',
'grid.nc',
'ts_clim.nc',
272 'ts_initial.nc',
'uv_initial.nc' ):
273 D.deliver_file(
'{wrfdir}/{vit[stormname]}.{ocrest}',
274 '{out_prefix}.pom.{ocrest}', ocrest=ocrest)
276 for iday
in xrange(int(math.floor(fcstlen/24.0+0.01))):
277 ocrest=
"%04d.nc"%iday
278 D.deliver_file(
'{wrfdir}/{vit[stormname]}.{ocrest}',
279 '{out_prefix}.pom.{ocrest}', ocrest=ocrest)
282 for ext
in (
'log',
'out',
'err'):
283 globme=conf.getdir(
'WORKhwrf')+
'/*.'+ext
284 logger.info(
'Globbing for %s log files'%(globme,))
285 for log
in glob.glob(globme):
288 logger.info(
'Found %d log file(s)'%(logcount,))
293 gsi_flag=conf.getbool(
'config',
'run_gsi')
296 gsiop=((
not hwrf_expt.gsi_d02.completed)
or
297 (
not hwrf_expt.gsi_d03.completed))
299 logger.warning(
'GSI failed, so all GSI products are optional.')
301 logger.info(
'GSI ran, so its products are mandatory.')
304 org_d01=hwrf_expt.gfs_init.realinit.wrfinput_at_time(
306 D.deliver_file(org_d01,
'{out_prefix}.wrforg_d01',optional=gsiop)
310 ceninit=hwrf_expt.fgat_init.init_at_time(hwrf_expt.conf.cycle)
313 org_d02=ceninit.runwrfanl.wrfanl_at_time(hwrf_expt.conf.cycle,d02)
314 org_d03=ceninit.runwrfanl.wrfanl_at_time(hwrf_expt.conf.cycle,d03)
315 D.deliver_file(org_d02,
'{out_prefix}.wrforg_d02',optional=gsiop)
316 D.deliver_file(org_d03,
'{out_prefix}.wrforg_d03',optional=gsiop)
320 ges_d02=ceninit.rstage3.wrfanl_at_time(hwrf_expt.conf.cycle,d02)
321 ges_d03=ceninit.rstage3.wrfanl_at_time(hwrf_expt.conf.cycle,d03)
322 D.deliver_file(ges_d02,
'{out_prefix}.wrfges_d02',optional=gsiop)
323 D.deliver_file(ges_d03,
'{out_prefix}.wrfges_d03',optional=gsiop)
327 D.deliver_file(
'{gsi_d02}/satbias_out',
328 '{out_prefix}.gsi_cvs2.biascr',optional=gsiop)
329 if hwrf_expt.gsid03_flag:
330 D.deliver_file(
'{gsi_d03}/satbias_out',
331 '{out_prefix}.gsi_cvs3.biascr',optional=gsiop)
336 jlogger.info(
'Delivering wrfdiag files to com.')
337 hwrf_expt.nhcp.deliver_wrfdiag()
340 jlogger.critical(
'HWRF: unable to deliver %d non-optional products to com.'%int(D.failures))
346 if conf.getbool(
'config',
'PARAFLAG'):
347 logger.info(
'You are not NCO, so I will deliver files to noscrub.')
349 logger.info(
'You are NCO so I will skip NOSCRUB deliveries.')
352 def fromcom(workpath,compath,optional=False):
353 D.deliver_file(workpath,compath,from_com=
True,optional=optional)
356 there=conf.get(
'dir',sdir,
'NOPE')
362 if havedir(
'outatcf'):
363 fromcom(
'{outatcf}',
'{out_prefix}.trak.hwrf.atcfunix')
364 if havedir(
'outdiag'):
365 fromcom(
'{outdiag}',
'{out_prefix}.trak.hwrf.3hourly*')
366 fromcom(
'{outdiag}',
'{out_prefix}*resolution',
True)
367 fromcom(
'{outdiag}',
'{out_prefix}*htcf*stats',
True)
368 fromcom(
'{outdiag}',
'{out_prefix}*htcf',
True)
369 fromcom(
'{outdiag}',
'a*.dat')
370 fromcom(
'{outdiag}',
'{out_prefix}.stats.tpc',optional=
True)
372 fromcom(
'{outdiag}',
'{com}/{out_prefix}.trak.hwrfd01.atcfunix')
373 fromcom(
'{outdiag}',
'{com}/{out_prefix}.trak.hwrfd02.atcfunix')
374 if havedir(
'outships'):
375 fromcom(
'{outships}',
'figures/*.txt',optional=
True)
376 if havedir(
'outstatus'):
377 fromcom(
'{outstatus}',
'{WORKhwrf}/submit.out',optional=
True)
378 timings=conf.strinterp(
'config',
'{outstatus}/{out_prefix}.timings')
379 inout=conf.strinterp(
'config',
'{WORKhwrf}/hwrf_*.out')
380 with open(timings,
'wt')
as outf:
381 for inoutfile
in glob.glob(inout):
382 if not os.path.exists(inoutfile):
383 logger.warning(
'%s: file does not exist; skipping'
385 with open(inoutfile,
'rt')
as inf:
387 if line.find(
'TIMING')>=0:
388 print>>outf,line.rstrip()
389 if havedir(
'outatcfcorrected'):
390 inatcf=conf.strinterp(
'config',
391 '{com}/{out_prefix}.trak.hwrf.atcfunix')
392 outatcf=conf.strinterp(
393 'config',
'{outatcfcorrected}/{out_prefix}.trak.hwrf.atcfunix')
409 flag_file=conf.strinterp(
'tdrcheck',
'{tdr_flag_file}')
412 except (EnvironmentError)
as e:
413 logger.error(
'%s: unable to get ensda_flag; assume False: %s'%(
414 flag_file,str(e)),exc_info=
True)
419 if __name__==
'__main__':
422 jlogger.info(
'hwrf_output is starting')
424 jlogger.info(
'hwrf_output has completed')
425 except Exception
as e:
426 jlogger.critical(
'hwrf_output is aborting: '+str(e),exc_info=
True)
Runs the POM initialization and POM-WRF coupled forecast.
This module provides a set of utility functions to do filesystem operations.
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
def deliver_file(self, workfile, comfile=None, from_com=False, optional=False, kwargs)
Delivers one file.
def jtwc_rewrite
Rewrites track files as used by the HWRF WPAC parallels from 2013-2015.
A utility class for delivering files.
Contains setup(), which initializes the produtil package.
def __init__(self, logger, conf)
Creates a new Deliverer with the specified logging.Logger and HWRFConfig.
def failures(self)
The number of failed deliveries.
def init_module
Initializes the HWRF object structure.
def __getitem__(self, key)
Gets a key from the hash of internal values used for string formatting, or raises KeyError if no such...
def email_afos_to_sdm(afos, args, kwargs)
Emails the AFOS file to the NOAA Senior Duty Meterologist (SDM)
A piece of data produced by a Task.
def setup(ignore_hup=False, dbnalert_logger=None, jobname=None, cluster=None, send_dbn=None, thread_logger=False, thread_stack=2 **24, kwargs)
Initializes the produtil package.
Utilities for ensemble-based data assimilation.
Stores products and tasks in an sqlite3 database file.
def makedirs
Make a directory tree, working around filesystem bugs.
copier
The copier is a function that returns either None, or a callable object suitable for passing into the...
def add_wave_alerts()
DBN ALERTS #########################################################.
def add_regrib_alerts()
Adds dbn alerts for GRIB products by adding DBNAlert objects to the hwrf_expt.gribber.
def add_nhc_alerts()
Adds dbn alerts for the hwrf_nhc_products program's output by adding DBNAlert objects to the hwrf_exp...
def conf(self)
The hwrf.config.HWRFConfig object for this Deliverer.
def deliver_multi(sourceglob, target, logger)
Delivers several files to a target directory, logging messages as it goes.
def log
Gets the logging.Logger for this Deliverer.
This module contains tasks to prepare input for the GFDL Vortex Tracker, run the tracker and deliver ...
def read_ensda_flag_file(flag_file)
Reads the stormX.run_ensda flag file.
def main()
Delivers HWRF files to COM.
def reset(self)
Resets the number of failures to zero.
def __setitem__(self, key, value)
Sets a key in an internal dict of values used for string formatting.