1 """!This module contains tasks to prepare input for the GFDL Vortex
2 Tracker, run the tracker and deliver its results."""
6 __all__ = [
'hwrf_combine_subset',
'tracker_subset',
'expandlatlon',
7 'vinttave',
'TrackerTask',
'GRIB1VintTave' ,
'jtwc_rewrite']
9 import re, shutil, os, tempfile, math, datetime, sys
13 from hwrf.numerics import TimeArray, to_fraction, to_datetime, \
14 to_datetime_rel, partial_ordering, to_timedelta
17 from produtil.run import runstr,bigexe,alias,checkrun,runsync
21 from hwrf.exceptions import TrackerError,TrackerModeError,TrackerStormError, \
22 MissingGRIBError,GRIBLocationError
27 """!Rewrites track files as used by the HWRF WPAC parallels from 2013-2015
28 @param inatcf input ATCF file
29 @param outatcf output ATCF file
30 @param logger a logging.Logger for log messages"""
31 with open(inatcf,
'rt')
as inf:
32 with open(outatcf,
'wt')
as outf:
39 .replace(
'-9,',
' 0,')\
40 .replace(
'-99,',
' 0,')\
41 .replace(
'-999,',
' 0,')\
42 .replace(
'-9999,',
' 0,')\
47 if logger
is not None:
48 logger.info(
'ATCF LINE: %s'%(line,))
50 elif logger
is not None:
51 logger.info(
'SKIP LINE: %s'%(line,))
57 """!Selects lines for the hwrfprs_c files.
59 This subroutine parses an array of lines from wgrib -s and selects
60 lines to send into the hwrfprs_c and hwrfsat_c GRIB1/2 output
61 files (hurricane_region=True) or hwrfprs_g and hwrfsat_g files
62 (hurricane_region=False). Long ago (in 2011) this was the input
63 to the tracker. In the current system, it contains fields the
64 tracker does not need, lacks some fields it does need, and is in
65 the wrong order for it to be accepted as tracker input. It
66 contains synthetic satellite and other diagnostic quantities that
67 are useful for various data analysis in the high resolution
68 combined grid. The tracker scripts no longer use this output set
69 for any purpose, but it is still valuable for diagnostics. This
70 is in the hwrf.tracker module for historical reasons: the
71 hwrfprs_c files were originally intended for the tracker.
72 @param wgribout output from wgrib
73 @param hurricane_region if True, a more limited set of variables is kept"""
78 var3d=[
'UGRD',
'VGRD',
'RH',
'ABSV']
83 var3d=[
'UGRD',
'VGRD',
'RH',
'SPFH',
'ABSV',
'HGT',
'TMP']
86 for lev
in [ 1000, 850, 700, 500, 250 ]:
89 if line.find(
'%s:%d mb:'%(var,lev))>=0:
95 for lev
in [ 250, 300, 350, 400, 450, 500, 600, 700, 850, 1000 ]:
96 for var
in [
'HGT',
'TMP']:
98 if line.find(
'%s:%d mb:'%(var,lev))>=0:
102 for line
in wgribout:
103 if line.find(
' mb:')<0
and \
104 re.search(
'PRMSL|SBT|BRT|AMSRE|trop|HPBL|PWAT'
105 '|CAPE|CIN|column|REFC|HLCY|GRD:10 m above'
106 '|TPCP:sfc|TMP:2 m above|RH:2 m above',line):
110 """!A simple wrapper around hwrf_diagnostic_subset that provides a
111 subset of the HWRF fields and vertical levels suitable for
112 analyzing the large-scale hurricane structure.
113 @param w output from wgrib"""
118 """!A simple wrapper around hwrf_diagnostic_subset that provides a
119 subset of the HWRF fields and vertical levels suitable for
120 analyzing the synoptic-scale or global fields and comparison to
121 the parent global model.
122 @param w output from wgrib"""
131 TRACKER_SUBSET=[
'HGT:925',
'HGT:850',
'HGT:700',
'UGRD:850',
'UGRD:700',
132 'UGRD:500',
'VGRD:850',
'VGRD:700',
'VGRD:500',
133 'UGRD:10 m ',
'VGRD:10 m ',
'ABSV:850',
'ABSV:700',
134 'PRMSL',
'HGT:900',
'HGT:800',
'HGT:750',
'HGT:650',
135 'HGT:600',
'HGT:550',
'HGT:500',
'HGT:450',
'HGT:400',
136 'HGT:350',
'HGT:300',
'HGT:250',
'TMP:500',
'TMP:450',
137 'TMP:400',
'TMP:350',
'TMP:300',
'TMP:250' ]
138 """The subset of fields needed from an HWRF GRIB1 file in order to run
139 the tracker. This is used by the tracker_subset function."""
142 """!This is a GRIB subsetter intended to be sent to the subset
143 operator of an hwrf.regrib.GRIB1Op object, to produce GRIB1 files
144 that contain only fields needed by the tracker.
146 This function iterates over wgrib -s output lines. It subsets the
147 lines, returning the ones the tracker expects, in the exact order
148 the tracker expects them. The input is an array of lines from
149 wgrib -s. This is intended to be used with the gribtask and
150 regrib modules by doing gribop/tracker_subset to produce a GRIB1Op
151 that will make a tracker subset of the given GRIB1 file.
152 @param wgribout output from wgrib"""
153 for findme
in TRACKER_SUBSET:
154 for line
in wgribout:
165 def action_expandlatlon(op,regrib,name,center,west,east,north,south,n0,n1,
166 scan,res0,res1,**kwargs):
167 """!Internal function that expands a lat-lon grid to make a bigger one.
169 This is part of the implementation of expandlatlon and should not
170 be called directly. Given the input grid from center.grib1grid,
171 expand the grid in all four compass directions by a specified
172 number of degrees, and produce a new grid with a different
173 resolution, scanning mode and gridpoint count
175 @param op the hwrf.regrib.GRIBOp being run
176 @param regrib the hwrf.regrib.Regrib for data storage
177 @param name the grid name
178 @param center the GRIB1 product being centered upon
179 @param west,east,north,south how many degrees to add in each direction (a float)
180 @param n0,n1 gridpoint counts for the GRIB1 grid 255
181 @param scan scanning mode flags for the GRIB1 grid 255
182 @param res0,res1 resolution information for GRIB1 grid 255
183 @param kwargs Ignored."""
184 ingrid=center.grib1grid
185 if regrib.logger
is not None:
186 regrib.logger.info(
'expandlatlon of input grid %s'%(ingrid,))
188 oldgrid=[int(x)
for x
in ingrid.split()]
190 ( ogid, ojunk1, onscount, oewcount, onlat, oelon, oscan, oslat, \
191 owlon, onsres, oewres, ojunk2 ) = oldgrid
197 if 0 != (oscan&128): (owlon,oelon) = (oelon,owlon)
198 if 0 != (oscan&64): (onlat,oslon) = (oslon,onlat)
199 if 0 != (oscan&32): (onsres,oewres) = (oewres,onsres)
201 (iwlon,ielon)=(owlon-int(1000*west), oelon+int(1000*east))
202 (islat,inlat)=(oslat-int(1000*south),onlat+int(1000*north))
205 if 0 != (iscan&128): (iwlon,ielon) = (ielon,iwlon)
206 if 0 != (iscan&64): (inlat,islat) = (islat,inlat)
207 if 0 != (iscan&32): (n0,n1) = (n1,n0)
209 s =
'255 0 %d %d %d %d %d %d %d %d %d 0' % \
210 ( int(n0), int(n1), inlat, ielon, iscan, islat, iwlon,
211 int(1000*res0), int(1000*res1) )
212 if regrib.logger
is not None:
213 regrib.logger.info(
'expandlatlon output grid is %s'%(s,))
217 """!Creates a GRID suitable for use with the hwrf.regrib module.
218 Given the input grid from center.grib1grid, expand the grid in all
219 four compass directions by a specified number of degrees, and
220 produce a new grid with a different resolution, scanning mode and
222 @param center the hwrf.regrib.GRIBBase on which to center
223 @param west,east,north,south number of degrees to expand
224 @param n a pair of ints, the GRIB1 grid 255 gridpoint count information
225 @param scan the GRIB1 grid 255 scanning mode information
226 @param res a pair of ints, the GRIB1 grid 255 resolution information
227 @param name the name of the grid
228 @returns a new hwrf.regrib.GRIBGridCompute that will compute
229 the expanded grid for any forecast time for which the center
230 object can provide inputs"""
231 assert(len(n)==2
and n[0]>0
and n[1]>0)
232 assert(scan>=0
and scan<=255)
233 assert(len(res)==2
and res[0]>0
and res[1]>0)
235 name=
'elatlon%dx%dp'%(int(n[0]),int(n[1]))
237 west,east,north,south,n[0],n[1],scan,res[0],res[1])
242 """!A GRIB1Op that runs vint and tave: two of the tracker prep
243 programs. They interpolate data vertically and temporally to
244 produce supplemented input GRIB files needed by the tracker
245 thermodynamic diagnostic output fields."""
247 """!GRIB1VintTave constructor"""
248 super(GRIB1VintTave,self).
__init__(
None,op)
249 assert(
'_args' in self.__dict__)
250 def make(self,regrib,**kwargs):
251 """!This runs the actual vint and tave programs on the GRIB1
252 file that comes from the argument provided to the constructor.
253 @param regrib the hwrf.regrib.Regrib for data storage information
254 @param kwargs passed to self.arg.make()"""
256 for arg
in self.
args():
257 ingrib=arg.make(regrib,**kwargs)
259 assert(ingrib
is not None)
262 blocksize=1048576,atime=
None,**kwargs):
263 """!Performs the actual work of GRIB1VintTave.make: copying
264 input, running vint and tave, copying output. Do not call
267 @param regrib The hwrf.regrib.Regrib for data storage information
268 @param ingrib the input GRIBBase that provides input files
269 @param time the time at which the data is being processed
270 @param task the hwrf.gribtask.GRIBTask that stores final output
271 @param blocksize a block size for I/O
272 @param atime analysis time
273 @param kwargs Ignored."""
274 infile=ingrib.grib1file
275 grid=ingrib.grib1grid
277 if atime
is None and task
is not None and 'atime' in task.__dict__:
280 assert(grid
is not None)
285 inindex = regrib.tempfile(
'grbindex')
286 znamelist = regrib.tempfile(
'vint_input')
287 tnamelist = regrib.tempfile(
'tint_input')
288 tave_namelist = regrib.tempfile(
'tave_input')
289 (zfile,zindex) = regrib.gribtemp(
'vint.z.grb1')
290 (tfile,tindex) = regrib.gribtemp(
'vint.t.grb1')
291 (tavefile,taveindex) = regrib.gribtemp(
'tave.grb1')
292 (outfile,outindex) = regrib.gribtemp(
'vinttave')
293 outgrbindex=regrib.tempfile(
'vinttave.ix')
295 if regrib.logger
is not None:
296 regrib.logger.info(
'vint-tave starting: infile=%s '
298 %(repr(infile),repr(inindex),repr(grid)))
303 vint=alias(bigexe(task.getexe(
'vint')))
304 tave=alias(bigexe(task.getexe(
'tave')))
305 grbindex=alias(bigexe(task.getexe(
'grbindex')))
310 fl=produtil.fileop.fortlink
311 if logger
is not None:
312 logger.info(
'ftime for vinttave is %s'%(str(time),))
314 fahr=task.conftimestrinterp(
'{fahr}',ftime=time,atime=atime)
315 def runit(cmd,runner=checkrun):
316 if logger
is not None: logger.info(repr(cmd))
320 if regrib.logger
is not None:
321 regrib.logger.info(
'generate grbindex file %s'%(inindex,))
322 runit(grbindex[infile,inindex])
324 with open(znamelist,
'wt')
as f:
325 nmltxt=
'&timein ifcsthour=%s, iparm=7 , gribver=1, '\
326 'g2_jpdtn=0/\n'%(fahr,)
328 if logger
is not None: logger.info(
'znamelist: '+nmltxt)
329 fl({11:infile, 16:task.confstr(
'hgt_levs'), 31:inindex, 51:zfile},
331 runit(vint < znamelist)
332 runit(grbindex[zfile,zindex])
334 with open(tnamelist,
'wt')
as f:
335 nmltxt=
'&timein ifcsthour=%s, iparm=11 , gribver=1, '\
336 'g2_jpdtn=0/\n'%(fahr,)
338 if logger
is not None: logger.info(
'tnamelist: '+nmltxt)
339 fl({11:infile, 16:task.confstr(
'tmp_levs'),
340 31:inindex, 51:tfile},force=
True)
341 runit(vint < tnamelist)
342 runit(grbindex[tfile,tindex])
344 with open(tave_namelist,
'wt')
as f:
345 nmltxt=
'&timein ifcsthour=%s, iparm=11 , gribver=1, '\
346 'g2_jpdtn=0/\n'%(fahr,)
348 if logger
is not None: logger.info(nmltxt)
349 fl({11:tfile, 31:tindex, 51:tavefile},force=
True)
350 runit(tave < tave_namelist)
351 runit(grbindex[tavefile,taveindex])
354 runit(copygb[dashg,
'-k4*-1 33 100 850',infile,inindex,outfile+
'.u850'])
355 runit(copygb[dashg,
'-k4*-1 33 100 700',infile,inindex,outfile+
'.u700'])
356 runit(copygb[dashg,
'-k4*-1 33 100 500',infile,inindex,outfile+
'.u500'])
357 runit(copygb[dashg,
'-k4*-1 33 105 10',infile,inindex,outfile+
'.u10m'])
358 runit(copygb[dashg,
'-k4*-1 7 100 850',infile,inindex,outfile+
'.z850'])
359 runit(copygb[dashg,
'-k4*-1 7 100 700',infile,inindex,outfile+
'.z700'])
360 runit(copygb[dashg,
'-k4*-1 2 102 0',infile,inindex,outfile+
'.mslp'])
361 runit(copygb[dashg, zfile, zindex, outfile+
'.phase'])
362 runit(copygb[dashg, tavefile,taveindex,outfile+
'.tave'])
364 with open(outfile,
'wb')
as outf:
365 for ext
in [
'u850',
'u700',
'u500',
'z850',
'z700',
366 'mslp',
'u10m',
'phase',
'tave']:
367 partname=
'%s.%s'%(outfile,ext)
368 if logger
is not None and not \
371 '%s: input file is empty! Things may break later.'
373 with open(partname,
'rb')
as inf:
374 shutil.copyfileobj(inf,outf)
376 runit(grbindex[outfile,outgrbindex])
377 if logger
is not None:
378 logger.info(
'vint-tave completed: infile=%s inindex=%s grid=%s'
379 %(repr(outfile),repr(outgrbindex),repr(grid)))
382 grib1grbindex=outgrbindex)
383 assert(retval
is not None)
388 vinttave=GRIB1VintTave
389 """An alias for GRIB1VintTave for backward compatibility"""
394 """!A FileProduct for tracker output.
396 This is part of the internal implementation of TrackerTask and
397 should not be used directly. It is a FileProduct that contains
398 the raw, unmodified tracker output"""
400 """!Copies the tracker output to its destination without
401 modification. Sets availability and calls callbacks.
402 @param tracker the Task that ran the tracker
403 @param fhr the forecast hour,ignored
404 @param logger a logging.Logger for log messages"""
405 infile=
'output.atcfunix'
406 if logger
is None: logger=tracker.log()
407 deliver_file(infile,self.
location,keep=
True,logger=logger)
410 logger.warning(
'%s: has no callbacks, so no DBN and no NHC delivery'%(self.
did,))
412 logger.warning(
'%s: has callbacks. Will call them.'%(self.
did,))
416 """!A FileProduct that delivers a subset of a track.
418 This is part of the internal implementation of TrackerTask and
419 should not be used directly. It is a FileProduct that delivers a
420 cleaned-up, subsetted version of the tracker output."""
422 """!Create a CleanATCFSubsetProduct that will deliver a track
423 with the specified last allowable forecast hour.
424 @param kwargs passed to produtil.datastore.FileProduct.__init__()"""
425 super(CleanATCFSubsetProduct,self).
__init__(**kwargs)
430 """!Reads the tracker output, discarding everything after the
431 specified forecast hour. If an invalid line is found after
432 time 0, that line and everything after it is discarded.
434 @param tracker the task that ran the tracker
435 @param fhr the last forecast hour to include
436 @param logger a logging.Logger for messages"""
437 if logger
is None: logger=tracker.log()
438 infile=
'output.atcfunix'
443 except (KeyError,TypeError,ValueError)
as e:
445 fhr=
get(
'subset_fhr',fhr)
446 freq=
get(
'subset_freq',1)
447 cut=
get(
'line_cut',
None)
449 with open(infile,
'rt')
as fin:
450 with tempfile.NamedTemporaryFile(delete=
False,
451 mode=
'w+t',bufsize=1,prefix=str(self.
location),dir=
'.') \
454 ofstat=os.fstat(fout.fileno())
455 os.fchmod(fout.fileno(),ofstat.st_mode|0644)
458 tracker,fhr,freq,cut,inline)
461 if outline
is not None:
465 deliver_file(outfile,self.
location,keep=
False,logger=logger)
468 logger.warning(
'%s: has no callbacks, so no DBN and no NHC delivery'%(self.
did,))
470 logger.warning(
'%s: has callbacks. Will call them.'%(self.
did,))
473 """!Processes one line of the track file. Decides if it is
474 time to cease processing of the track file (cut off the track)
477 @param tracker the task that ran the tracker
478 @param lastfhr the last forecast hour to include
479 @param freq the output frequency in hours. This allows
480 one to turn an hourly track into a six hourly track
481 @param cut index of the last character to include from each line
482 For example, 112 would only include up to the RMW field.
483 @param line the line to process.
484 @return A tuple (line,stop) where "line" is the line to append
485 to the output track, and "stop" is true iff it is time to stop
489 wind=int(line[47:51])
493 'Invalid wind in track file: wind is 0 at forecast '
494 'hour %s. Will cut off the track file at this '
495 'forecast hour.'%(repr(fhr),))
497 elif freq>1
and abs(fhr%freq)>=1:
499 elif cut
is not None:
500 return (line.rstrip()[0:cut]+
'\n',fhr>lastfhr)
502 return (line,fhr>lastfhr)
503 except (ValueError,KeyError)
as e:
508 tracker.log().error(
'Ignoring invalid ATCF line: %s'%(rline,))
509 tracker.log().warning(
'''Invalid ATCF line found. Wind or forecast hour are missing or invalid.
512 % (rline,str(e)), exc_info=
True)
520 """!This Task is used by the delivery script executed by the gettrk
521 program itself. It derives directly from Task and has no
522 knowledge of the surrounding HWRF workflow: it only knows about
523 the local track filename, destination filenames and final forecast
524 hours for each destination file."""
525 def __init__(self,dstore,taskname,fhr,fmin,logger=None,**kwargs):
526 """!Creates a TrackerView for the specified datastore and
527 taskname. It will deliver a file subsetted to the specified
528 forecast hour and minute.
530 @param dstore the produtil.datastore.Datastore for database info
531 @param taskname the task name in the database
532 @param fhr,fmin the last expected forecast time
533 @param logger The given logger is used to log
535 @param kwargs Other keywoard arguments are passed to the
536 produtil.datastore.Task.__init__() constructor."""
537 super(TrackerView,self).
__init__(dstore=dstore,taskname=taskname,
538 logger=logger,**kwargs)
548 """!Performs the actual delivery. Queries the database for
549 all products for this task that match certain requirements.
550 All of this task's RawATCFProduct products are delivered, as
551 well as any CleanATCFSubsetProduct products that have the
552 right forecast time. Type name is obtained from the
553 database's prodtype column in the PRODUCTS table. It is
554 assumed that the products for this task have a database id
555 beginning with "taskname::" where "taskname" is this task's
556 name. These assumptions match what is done by the
563 with ds.transaction()
as t:
564 prodids=t.query(
'SELECT id,type FROM products WHERE id LIKE "'
566 for (prodid,prodtype)
in prodids:
567 logger.info(
'%s: prodtype=%s'%(prodid,prodtype))
568 m=re.match(
'(.*)::(.*)',prodid)
570 logger.warning(
'%s: product id does not match '
571 'category::prodname pattern'%(prodid,))
572 (category,prodname)=m.groups()
574 logger.info(
'%s: not my track (want category %s got %s)'
575 %(prodid,me,category))
576 elif prodtype==
'RawATCFProduct':
579 work.append(prodobj.deliver)
580 elif prodtype==
'CleanATCFSubsetProduct':
583 dstore=ds,prodname=prodname,category=category)
584 if prodobj.available:
585 logger.info(
'%s: already delivered. Will '
586 'not recopy.'%(prodid,))
588 if 'subset_fhr' in prodobj:
589 subset_fhr=float(prodobj[
'subset_fhr'])
592 '%s: not delivering yet. Too early: '
593 '%fhrs > %fhrs'%(prodid,subset_fhr,now))
597 '%s: It is time to deliver now. Deliver'
598 ' at %fhrs, time is %fhrs'
599 %(prodid,subset_fhr,now))
601 logger.warning(
'%s: CleanATCFSubsetProduct has '
602 'no subset_fhr.'%(prodid,))
603 work.append(prodobj.deliver)
604 except Exception
as e:
605 logger.error(
'%s: unexpected exception: %s'%(
606 prodid,str(e)),exc_info=
True)
610 '%s: unrecognized prod type "%s" (only '
611 'RAWATCFProduct or CleanATCFSubsetProduct allowed).'
612 ' Will not deliver this while gettrk is running.'
619 func(self, now,logger=logger)
620 if abs(now)<1e-3
or ( now>0
and abs((now-6.0)%24.0)==0 ):
621 self.
postmsg(
'%s reached fhr=%s fmin=%s'
623 logger.info(
'%s reached fhr=%s fmin=%s'
632 tracker_delivery_script=
'''#! /usr/bin/env python
634 import produtil.setup, produtil.datastore
636 produtil.setup.setup()
637 t=hwrf.tracker.TrackerView(
638 dstore=produtil.datastore.Datastore("""{dstorepath}"""),
639 taskname="""{taskname}""",
640 fhr=int(sys.argv[1]),
641 fmin=int(sys.argv[2]) )
642 t.log().info("In ./deliver script")
644 """The delivery script for the tracker. This should not be used
645 directly: the TrackerTask sends it through .format(...) to insert
650 """!This task runs the GFDL Vortex Tracker on HWRF output. It is
651 intended to be used with the GRIBTask. At present, this only
652 supports a moving grid tracker. Placeholders are present for a
653 stationary grid tracker invocation, but those are not tested."""
658 def __init__(self,ds,conf,section,start,end,step,write_vit=True,
659 phaseflag=
True,structflag=
False,ikeflag=
False,
660 realtime=
True,**kwargs):
661 """!Creates a new TrackerTask
663 @param ds the produtil.datastore.Datastore to use
664 @param conf the hwrf.config.HWRFConfig for configuration options
665 @param section the HWRFConfig section to use
666 @param start,end,step the times at which data may be available
668 @param realtime the tracker should wait for data to appear and
669 call delivery scripts at every forecast time.
670 @param write_vit,phaseflag,structflag,ikeflag Passded on to the tracker namelist.
671 @param kwargs passed to hwrf.hwrftask.HWRFTask.__init__()
673 See the tracker documentation for details."""
674 super(TrackerTask,self).
__init__(ds,conf,section,**kwargs)
675 self.
_flags={
'write_vit':bool(write_vit),
676 'phaseflag':bool(phaseflag),
677 'structflag':bool(structflag),
678 'ikeflag':bool(ikeflag),
679 'realtime':bool(realtime)}
682 self.
_start=to_datetime(start)
683 self.
_end=to_datetime_rel(end,start)
684 self.
_step=to_timedelta(step)
689 self.
_gmodname=self._nml.nl_get(
'fnameinfo',
'gmodname')
690 self.
_rundescr=self._nml.nl_get(
'fnameinfo',
'rundescr')
691 self._nml.nl_set_if_unset(
'atcfinfo',
'atcfymdh',
692 int(self._start.strftime(
'%Y%m%d%H')))
694 """!Requests delivery of the raw, unmodified atcfunix file to
695 the specified location. This creates a new RawATCFProduct
696 with the specified product name and category. If the category
697 is unspecified or None, then this class's taskname will be
698 used. Note that the taskname MUST be used if running in
700 @param prodname the name of the new product
701 @param location the output file location, which will
702 be sent through confstrinterp() to do string replacement
703 on forecast and analysis times, and other variables
704 @param category if specified, the product category. By default,
705 this task's taskname is used."""
706 category=category if(category
is not None)
else self.
taskname
709 prodname=prodname,category=category,location=sloc)
711 self._deliveries.append((
None, prod))
715 """!Returns the tracker Product with the specified product name
716 or raises KeyError if none is found.
717 @param name the product name"""
720 """!Iterates over all tracker Products.
721 @param name only iterate over this named product"""
722 if name
is not None and name
in self.
_products:
725 for (n,p)
in self._products.iteritems():
yield p
727 """!Requests delivery of a cleaned subset of the tracker
728 atcfunix file, only up to the specified forecast hour. This
729 creates a new CleanATCFSubsetProduct with the specified
730 product name and category. If the category is unspecified or
731 None, then this class's taskname will be used. Note that the
732 taskname MUST be used if running in realtime=True mode.
733 @param prodname the name of the product
734 @param location the delivery location, which will be sent
735 through confstrinterp() to perform string replacement
736 @param fhr the last forecast hour
737 @param category the product category to use instead of self.taskname
738 @param freq the output ATCF frequency in hours
739 @param cut the last character to include in each line. For example,
742 fhr=int(to_fraction(self.
_end-self.
_start)/3600.0)
744 elif isinstance(fhr,datetime.timedelta):
745 fhr=math.ceil(to_fraction(fhr)/3600.0)
747 fhr=int(math.ceil(fhr))
748 sys.stderr.write(
'FHR='+repr(fhr)+
'\n')
749 assert(fhr>=0
and fhr<=126)
750 category=category if(category
is not None)
else self.
taskname
752 assert(fhr
is not None)
754 prodname=prodname,category=category,location=sloc)
755 prod[
'subset_fhr']=str(fhr)
756 prod[
'subset_freq']=str(freq)
758 prod[
'line_cut']=str(cut)
760 prod[
'line_cut']=
'9999'
762 assert(
'subset_fhr' in prod)
763 assert(prod[
'subset_fhr']
is not None)
764 assert(prod[
'subset_fhr'] !=
'')
766 self._deliveries.append((fhr, prod))
770 """!Requests delivery of a cleaned-up version of the full
771 length atcfunix file. This is the same as calling
772 send_atcfunix_subset with the full simulation length as the
773 end time. The prodname will be the Product's name, and the
774 location is the delivery location. If specified and not None,
775 the category will be the Product's category, otherwise it will
776 be this Task's taskname. Note that if realtime=True is
777 enabled, the category must be the taskname.
778 @param prodname the new product's name
779 @param location the delivery location
780 @param category the product category instead of self.taskname"""
784 """!Flag: output genesis vitals?"""
785 return self.
_flags[
'write_vit']
789 write_vit=property(getwrite_vit,
None,
None,
'Flag: output genesis vitals?')
791 """!Flag: output phase info?"""
792 return self.
_flags[
'phaseflag']
795 phaseflag=property(getphaseflag,
None,
None,
'Flag: output phase info?')
797 """!Flag: output structure info?"""
798 return self.
_flags[
'structflag']
801 structflag=property(getstructflag,
None,
None,
802 'Flag: output storm structure information?')
804 """!Flag: output IKE info?"""
805 return self.
_flags[
'ikeflag']
809 ikeflag=property(getikeflag,
None,
None,
810 'Flag: output storm IKE information?')
812 """!Flag: run in real-time mode?"""
813 return self.
_flags[
'realtime']
817 realtime=property(getrealtime,
None,
None,
'Flag: run in real-time mode?')
820 """!Loops through the given gribtask, getting all products with
821 specified name (gribname) for the times requested in the
822 TrackerBase constructor. Will raise MissingGRIBError if any
823 times have no product. Returns a TimeArray of Product
825 @param gribtask the hwrf.gribtask.GRIBTask to query
826 @param gribname the grib product name"""
828 dt=to_fraction(self.
_step)/10
829 for rtime
in gribs.times():
830 for ptime,product
in gribtask.products(name=gribname,time=rtime,
832 timediff=abs(to_fraction(ptime-rtime,negok=
True))
833 self.
log().debug(
'%s timediff %s at time %s'%
834 (gribname,str(timediff),rtime.strftime(
837 self.
log().debug(
'%s: no %s at time %s'%
838 ( gribtask.taskname,gribname,
839 rtime.strftime(
'%Y%m%d.%H%M%S') ))
842 self.
log().debug(
'%s: found %s at time %s'%
843 ( gribtask.taskname,gribname,
844 rtime.strftime(
'%Y%m%d.%H%M%S') ))
850 """!Tells the tracker to run in moving grid mode, for the
853 @param tcvitals The tcvitals must be an hwrf.storminfo.StormInfo.
854 @param gribtask The gribtask must be an hwrf.gribtask.GRIBTask.
855 @param gribname The gribname is the name in the
856 hwrf.regrib.RegribMany of the operation that produces the
857 tracker input for this moving storm."""
860 'Cannot mix stationary regional and moving grids in one '
861 'tracker invocation.')
864 'Cannot have multiple moving grids: the current tracker '
865 'implementation does not support that. Use multiple '
866 'TrackerTask objects instead.')
870 stid=tcvitals.longstormid
873 '%s: More than one of this storm specified.'%(stid,))
883 """!Adds a stationary grid on which to run the tracker.
884 @param gribtask The gribtask must be an hwrf.gribtask.GRIBTask
886 @param gribname The gribname is the operation name in the grib task's
887 hwrf.regrib.RegribMany for the GRIB data source."""
890 'Cannot mix stationary regional and moving grids in one '
891 'tracker invocation.')
897 """!Specifies the tcvitals for a storm to track.
898 @param tcvitals the hwrf.storminfo.StormInfo to use"""
901 'Specify tracker mode before adding storms. Call '
902 'add_moving_grid or add_regional_grid before add_storm')
905 'In moving grid mode, storms must have associated grid '
906 'data. Use add_moving_grid instead of add_storm.')
907 self.
_vitals[tcvitals.longstormid]=tcvitals
910 """!Symbolically links all input GRIB1/2 files to the current
911 working directory using the file naming convention expected by
912 the tracker. If self.realtime=False, then will also check for
913 the files' existence and will raise MissingGRIBError if one is
914 missing. If self.realtime=True, then it is assumed the
915 tracker is being run in waitfor mode and will wait for the
916 data on its own. In either case, GRIBLocationError will be
917 raised if any GRIB's Product.location is None or '' (the
918 location must be known to make the symlink).
919 @param inputlist a file to create with the list of tracker
922 for stid
in self._grids.keys():
924 logger.info(
'process storm %s'%(strstid,))
926 longstormid=str(self.
_vitals[stid].longstormid)
928 longstormid=
'regional'
930 atcfymdh=int(self._nml.nl_get(
'atcfinfo',
'atcfymdh'))
931 linkstart=
'%s.%s.%s.%10d.f' % \
933 for (ftime,grib)
in grids.iteritems():
934 logger.info(
'storm %s time %s'%(strstid,str(ftime)))
936 while not grib.check():
937 logger.info(
'%s: not yet available. Sleeping.')
940 locix=grib.grib1grbindex
941 if loc
is None or loc==
'':
942 raise GRIBLocationError(
943 '%s: no location specified for this GRIB product.'
945 minutes=int(to_fraction(ftime-self.
_start)/60)
947 linkstart,minutes),logger=logger)
948 if locix
is None or locix==
'':
950 '%s: no grbindex file location specified for this '
951 'GRIB product. Assuming .grbindex extension of '
952 'the GRIB file.'%(grib.did,))
953 locix=
'%s.grbindex'%(loc,)
955 locix,
'%s%05.5d.ix'%(linkstart,minutes),logger=logger)
956 with open(inputlist,
'wt')
as f:
958 for ftime
in self.
_grids[self._grids.keys()[0]].datatimes():
960 minute=int(to_fraction(ftime-self.
_start)/60)
961 f.write(
"%4d %5d\n"%(i,minute))
964 """!Concats all input GRIB1/2 files into a single file, waiting
965 for each one to become available first. Produces the grbindex
966 output for that file at the end. Only works in regional grid
967 mode, and will raise TrackerModeError if the mode is unknown
970 @note This is routine is untested because it is a bad idea.
971 It results in duplication of an enormous amount of data.
972 However, historically, this is how the tracker used to be
974 @param filename the input GRIB1 file name
975 @param ixfilename the output index file name"""
976 with open(filename,
'wb')
as outf:
978 for product
in self._gribs:
979 while not product.check():
980 self.
log().info(
'%s: not yet available. Sleeping...'
983 with open(filename,
'ab')
as outf:
984 with open(str(product.location()),
'rb')
as inf:
985 shutil.copyfileobj(inf,outf)
986 grbindex=alias(bigexe(self.
getexe(
'grbindex')))
987 if self.
confbool(
'sync_frequently',
True):
989 checkrun(grbindex[
'filename',
'ixfilename'])
992 """!Construct the tracker namelist and write it to the
993 specified file. The "realtime" flag is True when running in
994 real-time mode, and False if not.
995 @param filename the output namelist file's name"""
1002 nlsiu=nml.nl_set_if_unset
1004 ns(
'datein',
'inp%bcc',int(tm.year)/100)
1005 ns(
'datein',
'inp%byy',int(tm.year)%100)
1006 ns(
'datein',
'inp%bmm',int(tm.month))
1007 ns(
'datein',
'inp%bdd',int(tm.day))
1008 ns(
'datein',
'inp%bhh',int(tm.hour))
1009 nlsiu(
'datein',
'inp%modtyp',
'regional')
1010 nlsiu(
'datein',
'inp%lt_units',
'hours')
1011 ns(
'datein',
'inp%file_seq',
'multi')
1012 ns(
'datein',
'inp%nesttyp',
1013 'moveable' if(self.
_moving)
else 'stopped')
1014 nlsiu(
'atcfinfo',
'atcfnum',81)
1015 nlsiu(
'atcfinfo',
'atcfname',
'TEST')
1017 hours=int(to_fraction(self.
_step)/3600)
1018 minutes=int(to_fraction(self.
_step)/60-60*hours)
1019 ns(
'atcfinfo',
'atcffreq',hours*100+minutes)
1020 nlsiu(
'trackerinfo',
'trkrinfo%type',
'tracker')
1021 nlsiu(
'trackerinfo',
'trkrinfo%mslpthresh',0.0015)
1022 nlsiu(
'trackerinfo',
'trkrinfo%v850thresh',1.5000)
1023 nlsiu(
'trackerinfo',
'trkrinfo%gridtype',
'regional')
1024 nlsiu(
'trackerinfo',
'trkrinfo%contint',100.0)
1025 nlsiu(
'trackerinfo',
'trkrinfo%out_vit',
'y' if(self.
write_vit)
else 'n')
1026 nlsiu(
'trackerinfo',
'trkrinfo%gribver',1)
1027 nlsiu(
'trackerinfo',
'trkrinfo%g2_jpdtn',0)
1028 nlsiu(
'phaseinfo',
'phaseflag',
'y' if(self.
phaseflag)
else 'n')
1029 nlsiu(
'phaseinfo',
'phasescheme',
'both')
1030 nlsiu(
'phaseinfo',
'wcore_depth',1.0)
1031 nlsiu(
'structinfo',
'structflag',
'y' if (self.
structflag)
else 'n')
1032 nlsiu(
'structinfo',
'ikeflag',
'y' if(self.
ikeflag)
else 'n')
1034 for stid
in self._grids.keys():
1036 longstormid=str(self.
_vitals[stid].longstormid)
1038 longstormid=
'regional'
1040 ns(
'fnameinfo',
'atcfdescr',longstormid)
1041 nlsiu(
'verbose',
'verb',3)
1042 ns(
'waitinfo',
'use_waitfor', (
'y' if(realtime)
else 'n') )
1043 nlsiu(
'waitinfo',
'wait_min_age',10)
1044 nlsiu(
'waitinfo',
'wait_min_size',100)
1045 nlsiu(
'waitinfo',
'wait_max_wait',1800)
1046 nlsiu(
'waitinfo',
'wait_sleeptime',5)
1049 ns(
'waitinfo',
'use_per_fcst_command',
'y')
1050 ns(
'waitinfo',
'per_fcst_command',
"./deliver %[FHOUR] %[FMIN]")
1052 ns(
'waitinfo',
'use_per_fcst_command',
'n')
1057 order=[
'datein',
'atcfinfo',
'trackerinfo',
'phaseinfo',
'structinfo',
1058 'fnameinfo',
'verbose',
'waitinfo']
1059 Odatein=[
'inp%bcc',
'inp%byy',
'inp%bmm',
'inp%bdd',
'inp%bhh',
1060 'inp%model',
'inp%modtyp',
'inp%lt_units',
'inp%file_seq',
1062 Oatcfinfo=[
'atcfnum',
'atcfname',
'atcfymdh',
'atcffreq']
1063 Otrackerinfo=[
'trkrinfo%westbd',
'trkrinfo%eastbd',
1064 'trkrinfo%northbd',
'trkrinfo%southbd',
1065 'trkrinfo%type',
'trkrinfo%mslpthresh',
1066 'trkrinfo%v850thresh',
'trkrinfo%gridtype',
1067 'trkrinfo%contint',
'trkrinfo%out_vit']
1068 Ophaseinfo=[
'phaseflag',
'phasescheme',
'wcore_depth']
1069 Ostructinfo=[
'structflag',
'ikeflag']
1070 Ofnameinfo=[
'gmodname',
'rundescr',
'atcfdescr']
1071 Owaitinfo=[
'use_waitfor',
'wait_min_age',
'wait_min_size',
1072 'wait_sleeptime',
'use_per_fcst_command',
1074 varorder={
'datein':partial_ordering(Odatein),
1075 'atcfinfo':partial_ordering(Oatcfinfo),
1076 'trackerinfo':partial_ordering(Otrackerinfo),
1077 'phaseinfo':partial_ordering(Ophaseinfo),
1078 'structinfo':partial_ordering(Ostructinfo),
1079 'fnameinfo':partial_ordering(Ofnameinfo),
1080 'verbose':partial_ordering([
'verb']),
1081 'waitinfo':partial_ordering(Owaitinfo) }
1082 with open(str(filename),
'wt')
as f:
1083 f.write(nml.make_namelist(section_sorter=partial_ordering(order),
1084 var_sorters=varorder))
1087 """!Runs the tracker and delivers the resulting track files."""
1089 gettrk=alias(bigexe(self.
getexe(
'gettrk')))
1096 if os.path.exists(wd):
1100 logger.info(
'link tracker input')
1103 with open(
'deliver',
'wt')
as f:
1104 f.write(tracker_delivery_script.format(
1105 dstorepath=self.dstore.filename,
1107 ofstat=os.fstat(f.fileno())
1108 os.fchmod(f.fileno(),ofstat.st_mode|755)
1111 with open(
'input.vitals',
'wt')
as vitf:
1112 for stid,vital
in self._vitals.iteritems():
1113 vitf.write(vital.as_tcvitals())
1118 for empty
in [13,14,11,31]:
1119 with open(
'fort.%d'%(empty,),
'wb')
as f:
1123 logger.info(
'make tracker namelist')
1125 assert(isnonempty(
'input.nml'))
1132 15:
'input.fcsttime',
1135 62:
'%s.atcf'%(pre,),
1136 63:
'%s.radii'%(pre,),
1137 64:
'%s.atcfunix'%(pre,),
1138 66:
'%s.atcf_gen'%(pre,),
1139 68:
'%s.atcf_sink'%(pre,),
1140 69:
'%s.atcf_hfip'%(pre,),
1143 if self.
write_vit: linkme[67]=
'%s.genvitals'%(pre,)
1144 if self.
phaseflag: linkme[71]=
'%s.cps_parms'%(pre,)
1145 if self.
ikeflag: linkme[74]=
'%s.ike'%(pre,)
1148 linkme[72]=
'%s.structure'%(pre,)
1149 linkme[73]=
'%s.fractwind'%(pre,)
1150 linkme[76]=
'%s.pdfwind'%(pre,)
1152 for forti
in sorted(linkme.keys()):
1153 logger.info(
'Link: fort.%d = %s'%(forti,linkme[forti]))
1159 self.
postmsg(
'Starting gettrk.')
1160 cmd=(gettrk <
'./input.nml' )
1162 cmd = cmd >=
'tracker.log'
1163 logger.warning(repr(cmd))
1165 if self.
confbool(
'sync_frequently',
True):
1168 self.
postmsg(
'Successful return status from gettrk.')
1169 except(Exception)
as e:
1170 logger.critical(
'GETTRK FAILED: '+str(e),exc_info=
True)
1175 tgt=os.path.join(self.
outdir,
'tracker.log')
1176 deliver_file(
'tracker.log',tgt,keep=
True,logger=logger)
1181 """!Calls callbacks for all completed products.
1182 @param logger a logging.Logger for log messages"""
1183 if logger
is None: logger=self.
log()
1188 if not prod.has_callbacks():
1189 logger.warning(
'%s: rejoice: completed!! (no '
1190 'callbacks)'%(prod.did,))
1192 logger.warning(
'%s: rejoice: completed!! (has '
1193 'callbacks)'%(prod.did,))
1194 prod.call_callbacks()
1196 logger.error(
'%s: tracker product is not complete.'
def send_raw_atcfunix
Requests delivery of the raw, unmodified atcfunix file to the specified location. ...
Change directory, handle temporary directories.
ikeflag
Output storm IKE information? Read-only property.
This module provides a set of utility functions to do filesystem operations.
def hwrf_global_subset(w)
A simple wrapper around hwrf_diagnostic_subset that provides a subset of the HWRF fields and vertical...
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
Generates a Fortran namelist entirely from config files.
def jtwc_rewrite
Rewrites track files as used by the HWRF WPAC parallels from 2013-2015.
def add_storm(self, tcvitals)
Specifies the tcvitals for a storm to track.
This Task is used by the delivery script executed by the gettrk program itself.
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
def concat_grib(self, filename, ixfilename)
Concats all input GRIB1/2 files into a single file, waiting for each one to become available first...
def redirect(self)
Should subprograms' outputs be redirected to separate files?
taskname
Read-only property: the name of this task.
A subclass of Product that represents file delivery.
The base class of tasks run by the HWRF system.
def __init__(self, op)
GRIB1VintTave constructor.
def getphaseflag(self)
Flag: output phase info?
A FileProduct that delivers a subset of a track.
This task runs the GFDL Vortex Tracker on HWRF output.
conf
This HWRFTask's hwrf.config.HWRFConfig object.
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
def product(self, name)
Returns the tracker Product with the specified product name or raises KeyError if none is found...
This subclass of GRIBOp produces GRIB1 files.
def action_vinttave(self, regrib, ingrib, time=None, task=None, blocksize=1048576, atime=None, kwargs)
Performs the actual work of GRIB1VintTave.make: copying input, running vint and tave, copying output.
def deliver
Reads the tracker output, discarding everything after the specified forecast hour.
def deliver
Copies the tracker output to its destination without modification.
def send_atcfunix
Requests delivery of a cleaned-up version of the full length atcfunix file.
def add_regional_grid(self, gribtask, gribname)
Adds a stationary grid on which to run the tracker.
def action_expandlatlon(op, regrib, name, center, west, east, north, south, n0, n1, scan, res0, res1, kwargs)
Internal function that expands a lat-lon grid to make a bigger one.
def hwrf_combine_subset(w)
A simple wrapper around hwrf_diagnostic_subset that provides a subset of the HWRF fields and vertical...
Represents a process or actor that makes a Product.
def getstructflag(self)
Flag: output structure info?
realtime
Run in real-time mode? Read-only property.
def log(self)
Returns the logger object for this task.
def send_atcfunix_subset
Requests delivery of a cleaned subset of the tracker atcfunix file, only up to the specified forecast...
def confbool
Alias for self.conf.getbool for section self.section.
section
The confsection in self.section for this HWRFTask (read-only)
def tracker_subset(wgribout)
This is a GRIB subsetter intended to be sent to the subset operator of an hwrf.regrib.GRIB1Op object, to produce GRIB1 files that contain only fields needed by the tracker.
A shell-like syntax for running serial, MPI and OpenMP programs.
Base class of tasks run by HWRF.
def args(self)
Iterates over all child GRIBBase objects.
fmin
Last expected forecast minute.
outdir
The directory in which this task should deliver its final output.
structflag
Output storm structure information? Read-only property.
def get
Alias for self.meta() Returns the value of the specified metadata key or returns default if it is uns...
fhr
Last expected forecast hour.
This class represents a GRIBGrid that must be computed from some other input, usually a GRIB file...
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
def expandlatlon
Creates a GRID suitable for use with the hwrf.regrib module.
def getrealtime(self)
Flag: run in real-time mode?
Stores products and tasks in an sqlite3 database file.
location
Read-write property, an alias for getlocation() and setlocation().
This subclass of TempDir takes a directory name, instead of generating one automatically.
Time manipulation and other numerical routines.
Raised when an impossible tracker configuration is requested, such as running with a grid that is bot...
def run(self)
Performs the actual delivery.
write_vit
Output genesis vitals? Read-only property.
def fortlink
This is a convenience routine that makes many symbolic links to fort.N files for various integers N u...
workdir
The directory in which this task should be run.
def call_completed_callbacks
Calls callbacks for all completed products.
def getwrite_vit(self)
Flag: output genesis vitals?
string NOVITALS
Constant used to represent the lack of vitals data.
def make(self, regrib, kwargs)
This runs the actual vint and tave programs on the GRIB1 file that comes from the argument provided t...
def log
Obtain a logging domain.
def run(self)
Runs the tracker and delivers the resulting track files.
This subclass of GRIB1Op represents a GRIB1 file on disk that is ALREADY PRESENT. ...
def __init__(self, dstore, taskname, fhr, fmin, logger=None, kwargs)
Creates a TrackerView for the specified datastore and taskname.
A FileProduct for tracker output.
A time-indexed array that can only handle equally spaced times.
This class represents a GRIB1 or GRIB2 grid specification.
def getikeflag(self)
Flag: output IKE info?
def __init__(self, kwargs)
Create a CleanATCFSubsetProduct that will deliver a track with the specified last allowable forecast ...
def add_moving_grid(self, tcvitals, gribtask, gribname)
Tells the tracker to run in moving grid mode, for the specified storm.
def products
Iterates over all tracker Products.
did
Read-only property, an alias for getid().
def process_line(self, tracker, lastfhr, freq, cut, line)
Processes one line of the track file.
def grab_gribs(self, gribtask, gribname)
Loops through the given gribtask, getting all products with specified name (gribname) for the times r...
def make_namelist(self, filename)
Construct the tracker namelist and write it to the specified file.
Exceptions raised by the hwrf package.
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
Describes regribbing operations using an algebraic structure.
def call_callbacks
Calls all delivery callback functions.
def has_callbacks(self)
Returns True if this Product has any callback functions and False otherwise.
phaseflag
Output phase info? Read-only property.
Raised when multiple storms are requested, but only one was expected.
def link_grib(self, inputlist)
Symbolically links all input GRIB1/2 files to the current working directory using the file naming con...
A GRIB1Op that runs vint and tave: two of the tracker prep programs.
def hwrf_diagnostic_subset
Selects lines for the hwrfprs_c files.
def __init__(self, ds, conf, section, start, end, step, write_vit=True, phaseflag=True, structflag=False, ikeflag=False, realtime=True, kwargs)
Creates a new TrackerTask.
def make_symlink
Creates a symbolic link "target" that points to "source".