HWRF  trunk@4391
tracker.py
1 """!This module contains tasks to prepare input for the GFDL Vortex
2 Tracker, run the tracker and deliver its results."""
3 
4 ##@var __all__
5 # List of symbols to export by "from hwrf.tracker import *"
6 __all__ = [ 'hwrf_combine_subset', 'tracker_subset', 'expandlatlon',
7  'vinttave','TrackerTask','GRIB1VintTave' , 'jtwc_rewrite']
8 
9 import re, shutil, os, tempfile, math, datetime, sys
12 
13 from hwrf.numerics import TimeArray, to_fraction, to_datetime, \
14  to_datetime_rel, partial_ordering, to_timedelta
15 
16 from hwrf.regrib import GRIBOp, GRIB1Op
17 from produtil.run import runstr,bigexe,alias,checkrun,runsync
18 from produtil.cd import TempDir, NamedDir
19 from produtil.fileop import isnonempty, deliver_file
20 
21 from hwrf.exceptions import TrackerError,TrackerModeError,TrackerStormError, \
22  MissingGRIBError,GRIBLocationError
23 
24 ########################################################################
25 
26 def jtwc_rewrite(inatcf,outatcf,logger=None):
27  """!Rewrites track files as used by the HWRF WPAC parallels from 2013-2015
28  @param inatcf input ATCF file
29  @param outatcf output ATCF file
30  @param logger a logging.Logger for log messages"""
31  with open(inatcf,'rt') as inf:
32  with open(outatcf,'wt') as outf:
33  for line in inf:
34  line=line.rstrip() \
35  .replace('AA','IO')\
36  .replace('BB','IO')\
37  .replace('SP','SH')\
38  .replace('SI','SH')\
39  .replace('-9,',' 0,')\
40  .replace('-99,',' 0,')\
41  .replace('-999,',' 0,')\
42  .replace('-9999,',' 0,')\
43  [0:112]
44  hour=line[30:33]
45  ihour=int(hour)
46  if ihour%3==0:
47  if logger is not None:
48  logger.info('ATCF LINE: %s'%(line,))
49  print>>outf,line
50  elif logger is not None:
51  logger.info('SKIP LINE: %s'%(line,))
52 
53 
54 ########################################################################
55 
56 def hwrf_diagnostic_subset(wgribout,hurricane_region=False):
57  """!Selects lines for the hwrfprs_c files.
58 
59  This subroutine parses an array of lines from wgrib -s and selects
60  lines to send into the hwrfprs_c and hwrfsat_c GRIB1/2 output
61  files (hurricane_region=True) or hwrfprs_g and hwrfsat_g files
62  (hurricane_region=False). Long ago (in 2011) this was the input
63  to the tracker. In the current system, it contains fields the
64  tracker does not need, lacks some fields it does need, and is in
65  the wrong order for it to be accepted as tracker input. It
66  contains synthetic satellite and other diagnostic quantities that
67  are useful for various data analysis in the high resolution
68  combined grid. The tracker scripts no longer use this output set
69  for any purpose, but it is still valuable for diagnostics. This
70  is in the hwrf.tracker module for historical reasons: the
71  hwrfprs_c files were originally intended for the tracker.
72  @param wgribout output from wgrib
73  @param hurricane_region if True, a more limited set of variables is kept"""
74 
75  if hurricane_region:
76  # Skip specific humidity to save space. T and Z are handled
77  # in a later loop.
78  var3d=['UGRD','VGRD','RH','ABSV']
79  else:
80  # Include SPFH for direct comparison to GFS, and include RH
81  # for comparison to obs. Include Z and T since the later loop
82  # does not add them.
83  var3d=['UGRD','VGRD','RH','SPFH','ABSV','HGT','TMP']
84 
85  # Interesting levels of some popular variables:
86  for lev in [ 1000, 850, 700, 500, 250 ]:
87  for var in var3d:
88  for line in wgribout:
89  if line.find('%s:%d mb:'%(var,lev))>=0:
90  yield line
91 
92  # Extra levels for Z and T for diagnosing warm core when
93  # hurricane_region is True:
94  if hurricane_region:
95  for lev in [ 250, 300, 350, 400, 450, 500, 600, 700, 850, 1000 ]:
96  for var in ['HGT','TMP']:
97  for line in wgribout:
98  if line.find('%s:%d mb:'%(var,lev))>=0:
99  yield line
100 
101  # A bunch of 2D fields:
102  for line in wgribout:
103  if line.find(' mb:')<0 and \
104  re.search('PRMSL|SBT|BRT|AMSRE|trop|HPBL|PWAT'
105  '|CAPE|CIN|column|REFC|HLCY|GRD:10 m above'
106  '|TPCP:sfc|TMP:2 m above|RH:2 m above',line):
107  yield line
108 
110  """!A simple wrapper around hwrf_diagnostic_subset that provides a
111  subset of the HWRF fields and vertical levels suitable for
112  analyzing the large-scale hurricane structure.
113  @param w output from wgrib"""
114  for line in hwrf_diagnostic_subset(w,True):
115  yield line
116 
118  """!A simple wrapper around hwrf_diagnostic_subset that provides a
119  subset of the HWRF fields and vertical levels suitable for
120  analyzing the synoptic-scale or global fields and comparison to
121  the parent global model.
122  @param w output from wgrib"""
123  for line in hwrf_diagnostic_subset(w,False):
124  yield line
125 
126 ########################################################################
127 
128 ##@var TRACKER_SUBSET
129 #The subset of fields needed from an HWRF GRIB1 file in order to run
130 #the tracker. This is used by the tracker_subset function.
131 TRACKER_SUBSET=[ 'HGT:925', 'HGT:850', 'HGT:700', 'UGRD:850', 'UGRD:700',
132  'UGRD:500', 'VGRD:850', 'VGRD:700', 'VGRD:500',
133  'UGRD:10 m ', 'VGRD:10 m ', 'ABSV:850', 'ABSV:700',
134  'PRMSL', 'HGT:900', 'HGT:800', 'HGT:750', 'HGT:650',
135  'HGT:600', 'HGT:550', 'HGT:500', 'HGT:450', 'HGT:400',
136  'HGT:350', 'HGT:300', 'HGT:250', 'TMP:500', 'TMP:450',
137  'TMP:400', 'TMP:350', 'TMP:300', 'TMP:250' ]
138 """The subset of fields needed from an HWRF GRIB1 file in order to run
139 the tracker. This is used by the tracker_subset function."""
140 
141 def tracker_subset(wgribout):
142  """!This is a GRIB subsetter intended to be sent to the subset
143  operator of an hwrf.regrib.GRIB1Op object, to produce GRIB1 files
144  that contain only fields needed by the tracker.
145 
146  This function iterates over wgrib -s output lines. It subsets the
147  lines, returning the ones the tracker expects, in the exact order
148  the tracker expects them. The input is an array of lines from
149  wgrib -s. This is intended to be used with the gribtask and
150  regrib modules by doing gribop/tracker_subset to produce a GRIB1Op
151  that will make a tracker subset of the given GRIB1 file.
152  @param wgribout output from wgrib"""
153  for findme in TRACKER_SUBSET:
154  for line in wgribout:
155  if findme in line:
156  yield line
157  break
158 
159 ########################################################################
160 
161 # expandlatlon - the tracker scripts read the lat-lon grid information
162 # and expand the grid by 2.5 or 4.0 degrees. This allows mimicing the
163 # behavior:
164 
165 def action_expandlatlon(op,regrib,name,center,west,east,north,south,n0,n1,
166  scan,res0,res1,**kwargs):
167  """!Internal function that expands a lat-lon grid to make a bigger one.
168 
169  This is part of the implementation of expandlatlon and should not
170  be called directly. Given the input grid from center.grib1grid,
171  expand the grid in all four compass directions by a specified
172  number of degrees, and produce a new grid with a different
173  resolution, scanning mode and gridpoint count
174 
175  @param op the hwrf.regrib.GRIBOp being run
176  @param regrib the hwrf.regrib.Regrib for data storage
177  @param name the grid name
178  @param center the GRIB1 product being centered upon
179  @param west,east,north,south how many degrees to add in each direction (a float)
180  @param n0,n1 gridpoint counts for the GRIB1 grid 255
181  @param scan scanning mode flags for the GRIB1 grid 255
182  @param res0,res1 resolution information for GRIB1 grid 255
183  @param kwargs Ignored."""
184  ingrid=center.grib1grid
185  if regrib.logger is not None:
186  regrib.logger.info('expandlatlon of input grid %s'%(ingrid,))
187  #255 0 501 401 22042 103995 136 10042 118995 30 30 0'
188  oldgrid=[int(x) for x in ingrid.split()]
189  #[255, 0, 501, 401, 21405, 116717, 136, 9405, 131717, 30, 30, 0]
190  ( ogid, ojunk1, onscount, oewcount, onlat, oelon, oscan, oslat, \
191  owlon, onsres, oewres, ojunk2 ) = oldgrid
192 
193  # Define a convenience function to simplify this code. We need to
194  # do int(round()) to mimic awk's math:
195 
196 
197  if 0 != (oscan&128): (owlon,oelon) = (oelon,owlon)
198  if 0 != (oscan&64): (onlat,oslon) = (oslon,onlat)
199  if 0 != (oscan&32): (onsres,oewres) = (oewres,onsres)
200 
201  (iwlon,ielon)=(owlon-int(1000*west), oelon+int(1000*east))
202  (islat,inlat)=(oslat-int(1000*south),onlat+int(1000*north))
203 
204  iscan=int(scan)
205  if 0 != (iscan&128): (iwlon,ielon) = (ielon,iwlon)
206  if 0 != (iscan&64): (inlat,islat) = (islat,inlat)
207  if 0 != (iscan&32): (n0,n1) = (n1,n0)
208 
209  s = '255 0 %d %d %d %d %d %d %d %d %d 0' % \
210  ( int(n0), int(n1), inlat, ielon, iscan, islat, iwlon,
211  int(1000*res0), int(1000*res1) )
212  if regrib.logger is not None:
213  regrib.logger.info('expandlatlon output grid is %s'%(s,))
214  return hwrf.regrib.GRIBGrid(name,s,None)
215 
216 def expandlatlon(center,west,east,north,south,n,scan,res,name=None):
217  """!Creates a GRID suitable for use with the hwrf.regrib module.
218  Given the input grid from center.grib1grid, expand the grid in all
219  four compass directions by a specified number of degrees, and
220  produce a new grid with a different resolution, scanning mode and
221  gridpoint count
222  @param center the hwrf.regrib.GRIBBase on which to center
223  @param west,east,north,south number of degrees to expand
224  @param n a pair of ints, the GRIB1 grid 255 gridpoint count information
225  @param scan the GRIB1 grid 255 scanning mode information
226  @param res a pair of ints, the GRIB1 grid 255 resolution information
227  @param name the name of the grid
228  @returns a new hwrf.regrib.GRIBGridCompute that will compute
229  the expanded grid for any forecast time for which the center
230  object can provide inputs"""
231  assert(len(n)==2 and n[0]>0 and n[1]>0)
232  assert(scan>=0 and scan<=255)
233  assert(len(res)==2 and res[0]>0 and res[1]>0)
234  if name is None:
235  name='elatlon%dx%dp'%(int(n[0]),int(n[1]))
236  return hwrf.regrib.GRIBGridCompute(name,action_expandlatlon,center,
237  west,east,north,south,n[0],n[1],scan,res[0],res[1])
238 
239 ########################################################################
240 
242  """!A GRIB1Op that runs vint and tave: two of the tracker prep
243  programs. They interpolate data vertically and temporally to
244  produce supplemented input GRIB files needed by the tracker
245  thermodynamic diagnostic output fields."""
246  def __init__(self,op):
247  """!GRIB1VintTave constructor"""
248  super(GRIB1VintTave,self).__init__(None,op)
249  assert('_args' in self.__dict__)
250  def make(self,regrib,**kwargs):
251  """!This runs the actual vint and tave programs on the GRIB1
252  file that comes from the argument provided to the constructor.
253  @param regrib the hwrf.regrib.Regrib for data storage information
254  @param kwargs passed to self.arg.make()"""
255  ingrib=None
256  for arg in self.args():
257  ingrib=arg.make(regrib,**kwargs)
258  break
259  assert(ingrib is not None)
260  return self.action_vinttave(regrib,ingrib,**kwargs)
261  def action_vinttave(self,regrib,ingrib,time=None,task=None,
262  blocksize=1048576,atime=None,**kwargs):
263  """!Performs the actual work of GRIB1VintTave.make: copying
264  input, running vint and tave, copying output. Do not call
265  directly.
266  @protected
267  @param regrib The hwrf.regrib.Regrib for data storage information
268  @param ingrib the input GRIBBase that provides input files
269  @param time the time at which the data is being processed
270  @param task the hwrf.gribtask.GRIBTask that stores final output
271  @param blocksize a block size for I/O
272  @param atime analysis time
273  @param kwargs Ignored."""
274  infile=ingrib.grib1file
275  grid=ingrib.grib1grid
276 
277  if atime is None and task is not None and 'atime' in task.__dict__:
278  atime=task.atime
279 
280  assert(grid is not None)
281  assert(grid!='')
282 
283  # Request some temporary filenames for the numerous input and
284  # output files from these programs:
285  inindex = regrib.tempfile('grbindex')
286  znamelist = regrib.tempfile('vint_input')
287  tnamelist = regrib.tempfile('tint_input')
288  tave_namelist = regrib.tempfile('tave_input')
289  (zfile,zindex) = regrib.gribtemp('vint.z.grb1')
290  (tfile,tindex) = regrib.gribtemp('vint.t.grb1')
291  (tavefile,taveindex) = regrib.gribtemp('tave.grb1')
292  (outfile,outindex) = regrib.gribtemp('vinttave')
293  outgrbindex=regrib.tempfile('vinttave.ix')
294 
295  if regrib.logger is not None:
296  regrib.logger.info('vint-tave starting: infile=%s '
297  'inindex=%s grid=%s'
298  %(repr(infile),repr(inindex),repr(grid)))
299 
300  # Get the executables:
301  copygb=regrib.copygb
302  wgrib=regrib.wgrib
303  vint=alias(bigexe(task.getexe('vint')))
304  tave=alias(bigexe(task.getexe('tave')))
305  grbindex=alias(bigexe(task.getexe('grbindex')))
306 
307  logger=regrib.logger
308 
309  # And make some aliases to simplify the rest of this code:
310  fl=produtil.fileop.fortlink #fl = make a symlink
311  if logger is not None:
312  logger.info('ftime for vinttave is %s'%(str(time),))
313  # fahr = forecast-analysis in hrs.
314  fahr=task.conftimestrinterp('{fahr}',ftime=time,atime=atime)
315  def runit(cmd,runner=checkrun):
316  if logger is not None: logger.info(repr(cmd))
317  return runner(cmd)
318 
319  # Run grbindex on the input file:
320  if regrib.logger is not None:
321  regrib.logger.info('generate grbindex file %s'%(inindex,))
322  runit(grbindex[infile,inindex])
323 
324  with open(znamelist,'wt') as f:
325  nmltxt='&timein ifcsthour=%s, iparm=7 , gribver=1, '\
326  'g2_jpdtn=0/\n'%(fahr,)
327  f.write(nmltxt)
328  if logger is not None: logger.info('znamelist: '+nmltxt)
329  fl({11:infile, 16:task.confstr('hgt_levs'), 31:inindex, 51:zfile},
330  force=True)
331  runit(vint < znamelist)
332  runit(grbindex[zfile,zindex])
333 
334  with open(tnamelist,'wt') as f:
335  nmltxt='&timein ifcsthour=%s, iparm=11 , gribver=1, '\
336  'g2_jpdtn=0/\n'%(fahr,)
337  f.write(nmltxt)
338  if logger is not None: logger.info('tnamelist: '+nmltxt)
339  fl({11:infile, 16:task.confstr('tmp_levs'),
340  31:inindex, 51:tfile},force=True)
341  runit(vint < tnamelist)
342  runit(grbindex[tfile,tindex])
343 
344  with open(tave_namelist,'wt') as f:
345  nmltxt='&timein ifcsthour=%s, iparm=11 , gribver=1, '\
346  'g2_jpdtn=0/\n'%(fahr,)
347  f.write(nmltxt)
348  if logger is not None: logger.info(nmltxt)
349  fl({11:tfile, 31:tindex, 51:tavefile},force=True)
350  runit(tave < tave_namelist)
351  runit(grbindex[tavefile,taveindex])
352 
353  dashg='-g%s'%(grid,)
354  runit(copygb[dashg,'-k4*-1 33 100 850',infile,inindex,outfile+'.u850'])
355  runit(copygb[dashg,'-k4*-1 33 100 700',infile,inindex,outfile+'.u700'])
356  runit(copygb[dashg,'-k4*-1 33 100 500',infile,inindex,outfile+'.u500'])
357  runit(copygb[dashg,'-k4*-1 33 105 10',infile,inindex,outfile+'.u10m'])
358  runit(copygb[dashg,'-k4*-1 7 100 850',infile,inindex,outfile+'.z850'])
359  runit(copygb[dashg,'-k4*-1 7 100 700',infile,inindex,outfile+'.z700'])
360  runit(copygb[dashg,'-k4*-1 2 102 0',infile,inindex,outfile+'.mslp'])
361  runit(copygb[dashg, zfile, zindex, outfile+'.phase'])
362  runit(copygb[dashg, tavefile,taveindex,outfile+'.tave'])
363 
364  with open(outfile,'wb') as outf:
365  for ext in ['u850','u700','u500','z850','z700',
366  'mslp','u10m','phase','tave']:
367  partname='%s.%s'%(outfile,ext)
368  if logger is not None and not \
369  produtil.fileop.isnonempty(partname):
370  logger.warning(
371  '%s: input file is empty! Things may break later.'
372  %(partname,))
373  with open(partname,'rb') as inf:
374  shutil.copyfileobj(inf,outf)
375 
376  runit(grbindex[outfile,outgrbindex])
377  if logger is not None:
378  logger.info('vint-tave completed: infile=%s inindex=%s grid=%s'
379  %(repr(outfile),repr(outgrbindex),repr(grid)))
380 
381  retval=hwrf.regrib.GRIB1File(outfile,None,grid,
382  grib1grbindex=outgrbindex)
383  assert(retval is not None)
384  return retval
385 
386 ##@var vinttave
387 # An alias for GRIB1VintTave
388 vinttave=GRIB1VintTave # an alias for the old vinttave function
389 """An alias for GRIB1VintTave for backward compatibility"""
390 
391 ########################################################################
392 
394  """!A FileProduct for tracker output.
395 
396  This is part of the internal implementation of TrackerTask and
397  should not be used directly. It is a FileProduct that contains
398  the raw, unmodified tracker output"""
399  def deliver(self,tracker,fhr,logger=None):
400  """!Copies the tracker output to its destination without
401  modification. Sets availability and calls callbacks.
402  @param tracker the Task that ran the tracker
403  @param fhr the forecast hour,ignored
404  @param logger a logging.Logger for log messages"""
405  infile='output.atcfunix'
406  if logger is None: logger=tracker.log()
407  deliver_file(infile,self.location,keep=True,logger=logger)
408  self.available=True
409  if not self.has_callbacks():
410  logger.warning('%s: has no callbacks, so no DBN and no NHC delivery'%(self.did,))
411  else:
412  logger.warning('%s: has callbacks. Will call them.'%(self.did,))
413  self.call_callbacks(logger=logger)
414 
416  """!A FileProduct that delivers a subset of a track.
417 
418  This is part of the internal implementation of TrackerTask and
419  should not be used directly. It is a FileProduct that delivers a
420  cleaned-up, subsetted version of the tracker output."""
421  def __init__(self,**kwargs):
422  """!Create a CleanATCFSubsetProduct that will deliver a track
423  with the specified last allowable forecast hour.
424  @param kwargs passed to produtil.datastore.FileProduct.__init__()"""
425  super(CleanATCFSubsetProduct,self).__init__(**kwargs)
426  #assert(subset_fhr is not None)
427  #if subset_fhr is not None:
428  # self['subset_fhr']=str(subset_fhr)
429  def deliver(self,tracker,fhr,logger=None):
430  """!Reads the tracker output, discarding everything after the
431  specified forecast hour. If an invalid line is found after
432  time 0, that line and everything after it is discarded.
433 
434  @param tracker the task that ran the tracker
435  @param fhr the last forecast hour to include
436  @param logger a logging.Logger for messages"""
437  if logger is None: logger=tracker.log()
438  infile='output.atcfunix'
439  def get(s,v):
440  if s in self:
441  try:
442  return int(self[s])
443  except (KeyError,TypeError,ValueError) as e:
444  return v
445  fhr=get('subset_fhr',fhr)
446  freq=get('subset_freq',1)
447  cut=get('line_cut',None)
448 
449  with open(infile,'rt') as fin:
450  with tempfile.NamedTemporaryFile(delete=False,
451  mode='w+t',bufsize=1,prefix=str(self.location),dir='.') \
452  as fout:
453  outfile=fout.name
454  ofstat=os.fstat(fout.fileno())
455  os.fchmod(fout.fileno(),ofstat.st_mode|0644)
456  for inline in fin:
457  (outline,stop)=self.process_line(
458  tracker,fhr,freq,cut,inline)
459  # Now continue parsing the remaining lines
460  if stop: break # current hour is past last print hour
461  if outline is not None:
462  fout.write(outline)
463 
464  logger=tracker.log()
465  deliver_file(outfile,self.location,keep=False,logger=logger)
466  self.available=True
467  if not self.has_callbacks():
468  logger.warning('%s: has no callbacks, so no DBN and no NHC delivery'%(self.did,))
469  else:
470  logger.warning('%s: has callbacks. Will call them.'%(self.did,))
471  self.call_callbacks(logger=logger)
472  def process_line(self,tracker,lastfhr,freq,cut,line):
473  """!Processes one line of the track file. Decides if it is
474  time to cease processing of the track file (cut off the track)
475  or continue parsing.
476 
477  @param tracker the task that ran the tracker
478  @param lastfhr the last forecast hour to include
479  @param freq the output frequency in hours. This allows
480  one to turn an hourly track into a six hourly track
481  @param cut index of the last character to include from each line
482  For example, 112 would only include up to the RMW field.
483  @param line the line to process.
484  @return A tuple (line,stop) where "line" is the line to append
485  to the output track, and "stop" is true iff it is time to stop
486  parsing."""
487  try:
488  fhr=int(line[29:33])
489  wind=int(line[47:51])
490  if wind<=0:
491  if fhr>0:
492  tracker.log().error(
493  'Invalid wind in track file: wind is 0 at forecast '
494  'hour %s. Will cut off the track file at this '
495  'forecast hour.'%(repr(fhr),))
496  return (None,True)
497  elif freq>1 and abs(fhr%freq)>=1:
498  pass # not on an output time
499  elif cut is not None:
500  return (line.rstrip()[0:cut]+'\n',fhr>lastfhr)
501  else:
502  return (line,fhr>lastfhr)
503  except (ValueError,KeyError) as e:
504  # Invalid line: forecast hour or wind are missing or invalid.
505  rline=line.rstrip()
506  # Send a short error at ERROR level for the jlogfile, and
507  # one at WARNING for the master log.
508  tracker.log().error('Ignoring invalid ATCF line: %s'%(rline,))
509  tracker.log().warning('''Invalid ATCF line found. Wind or forecast hour are missing or invalid.
510  Line: %s
511  Error: %s'''
512  % (rline,str(e)), exc_info=True)
513  # We get here if there is invalid data at hour 0. We do not
514  # append the invalid line (None), and we do not stop
515  # processing (False).
516  return (None,False)
517 
518 ########################################################################
520  """!This Task is used by the delivery script executed by the gettrk
521  program itself. It derives directly from Task and has no
522  knowledge of the surrounding HWRF workflow: it only knows about
523  the local track filename, destination filenames and final forecast
524  hours for each destination file."""
525  def __init__(self,dstore,taskname,fhr,fmin,logger=None,**kwargs):
526  """!Creates a TrackerView for the specified datastore and
527  taskname. It will deliver a file subsetted to the specified
528  forecast hour and minute.
529 
530  @param dstore the produtil.datastore.Datastore for database info
531  @param taskname the task name in the database
532  @param fhr,fmin the last expected forecast time
533  @param logger The given logger is used to log
534  messages.
535  @param kwargs Other keywoard arguments are passed to the
536  produtil.datastore.Task.__init__() constructor."""
537  super(TrackerView,self).__init__(dstore=dstore,taskname=taskname,
538  logger=logger,**kwargs)
539  self.fhr=int(fhr)
540  self.fmin=int(fmin)
541 
542  ##@var fhr
543  # Last expected forecast hour
544 
545  ##@var fmin
546  # Last expected forecast minute
547  def run(self):
548  """!Performs the actual delivery. Queries the database for
549  all products for this task that match certain requirements.
550  All of this task's RawATCFProduct products are delivered, as
551  well as any CleanATCFSubsetProduct products that have the
552  right forecast time. Type name is obtained from the
553  database's prodtype column in the PRODUCTS table. It is
554  assumed that the products for this task have a database id
555  beginning with "taskname::" where "taskname" is this task's
556  name. These assumptions match what is done by the
557  TrackerTask."""
558  ds=self.dstore
559  logger=self.log()
560  work=list()
561  me=self.taskname
562  now=self.fmin/60.0
563  with ds.transaction() as t:
564  prodids=t.query('SELECT id,type FROM products WHERE id LIKE "'
565  +self.taskname+'%"')
566  for (prodid,prodtype) in prodids:
567  logger.info('%s: prodtype=%s'%(prodid,prodtype))
568  m=re.match('(.*)::(.*)',prodid)
569  if not m:
570  logger.warning('%s: product id does not match '
571  'category::prodname pattern'%(prodid,))
572  (category,prodname)=m.groups()
573  if category!=me:
574  logger.info('%s: not my track (want category %s got %s)'
575  %(prodid,me,category))
576  elif prodtype=='RawATCFProduct':
577  prodobj=RawATCFProduct(dstore=ds,prodname=prodname,
578  category=category)
579  work.append(prodobj.deliver)
580  elif prodtype=='CleanATCFSubsetProduct':
581  try:
582  prodobj=CleanATCFSubsetProduct(
583  dstore=ds,prodname=prodname,category=category)
584  if prodobj.available:
585  logger.info('%s: already delivered. Will '
586  'not recopy.'%(prodid,))
587  continue
588  if 'subset_fhr' in prodobj:
589  subset_fhr=float(prodobj['subset_fhr'])
590  if subset_fhr>now:
591  logger.info(
592  '%s: not delivering yet. Too early: '
593  '%fhrs > %fhrs'%(prodid,subset_fhr,now))
594  continue
595  else:
596  logger.warning(
597  '%s: It is time to deliver now. Deliver'
598  ' at %fhrs, time is %fhrs'
599  %(prodid,subset_fhr,now))
600  else:
601  logger.warning('%s: CleanATCFSubsetProduct has '
602  'no subset_fhr.'%(prodid,))
603  work.append(prodobj.deliver)
604  except Exception as e:
605  logger.error('%s: unexpected exception: %s'%(
606  prodid,str(e)),exc_info=True)
607  raise
608  else:
609  logger.warning(
610  '%s: unrecognized prod type "%s" (only '
611  'RAWATCFProduct or CleanATCFSubsetProduct allowed).'
612  ' Will not deliver this while gettrk is running.'
613  %(prodid,prodtype))
614  if self.fmin<1e-3:
615  now=self.fhr
616  else:
617  now=self.fmin/60.
618  for func in work:
619  func(self, now,logger=logger)
620  if abs(now)<1e-3 or ( now>0 and abs((now-6.0)%24.0)==0 ):
621  self.postmsg('%s reached fhr=%s fmin=%s'
622  %(self.taskname,repr(self.fhr),repr(self.fmin)))
623  logger.info('%s reached fhr=%s fmin=%s'
624  %(self.taskname,repr(self.fhr),repr(self.fmin)))
625 
626 ########################################################################
627 
628 ##@var tracker_delivery_script
629 # A short Python script run by the gfdl_vortextracker Fortran
630 # program to deliver the track file. Do not use this directly.
631 # It is passed through .format(...) to insert certain data.
632 tracker_delivery_script='''#! /usr/bin/env python
633 import sys
634 import produtil.setup, produtil.datastore
635 import hwrf.tracker
636 produtil.setup.setup()
637 t=hwrf.tracker.TrackerView(
638  dstore=produtil.datastore.Datastore("""{dstorepath}"""),
639  taskname="""{taskname}""",
640  fhr=int(sys.argv[1]),
641  fmin=int(sys.argv[2]) )
642 t.log().info("In ./deliver script")
643 t.run()'''
644 """The delivery script for the tracker. This should not be used
645 directly: the TrackerTask sends it through .format(...) to insert
646 certain data."""
647 
648 ########################################################################
650  """!This task runs the GFDL Vortex Tracker on HWRF output. It is
651  intended to be used with the GRIBTask. At present, this only
652  supports a moving grid tracker. Placeholders are present for a
653  stationary grid tracker invocation, but those are not tested."""
654 
655  ##@var NOVITALS
656  # Constant used to represent the lack of vitals data.
657  NOVITALS=''
658  def __init__(self,ds,conf,section,start,end,step,write_vit=True,
659  phaseflag=True,structflag=False,ikeflag=False,
660  realtime=True,**kwargs):
661  """!Creates a new TrackerTask
662 
663  @param ds the produtil.datastore.Datastore to use
664  @param conf the hwrf.config.HWRFConfig for configuration options
665  @param section the HWRFConfig section to use
666  @param start,end,step the times at which data may be available
667  from the GRIBTasks.
668  @param realtime the tracker should wait for data to appear and
669  call delivery scripts at every forecast time.
670  @param write_vit,phaseflag,structflag,ikeflag Passded on to the tracker namelist.
671  @param kwargs passed to hwrf.hwrftask.HWRFTask.__init__()
672 
673  See the tracker documentation for details."""
674  super(TrackerTask,self).__init__(ds,conf,section,**kwargs)
675  self._flags={'write_vit':bool(write_vit),
676  'phaseflag':bool(phaseflag),
677  'structflag':bool(structflag),
678  'ikeflag':bool(ikeflag),
679  'realtime':bool(realtime)}
680  self._vitals=dict()
681  self._grids=dict()
682  self._start=to_datetime(start)
683  self._end=to_datetime_rel(end,start)
684  self._step=to_timedelta(step)
685  self._moving=None
686  self._deliveries=list()
687  self._products=dict()
688  self._nml=hwrf.namelist.Conf2Namelist(conf=self.conf,section=self.section)
689  self._gmodname=self._nml.nl_get('fnameinfo','gmodname')
690  self._rundescr=self._nml.nl_get('fnameinfo','rundescr')
691  self._nml.nl_set_if_unset('atcfinfo','atcfymdh',
692  int(self._start.strftime('%Y%m%d%H')))
693  def send_raw_atcfunix(self,prodname,location,category=None):
694  """!Requests delivery of the raw, unmodified atcfunix file to
695  the specified location. This creates a new RawATCFProduct
696  with the specified product name and category. If the category
697  is unspecified or None, then this class's taskname will be
698  used. Note that the taskname MUST be used if running in
699  realtime=True mode.
700  @param prodname the name of the new product
701  @param location the output file location, which will
702  be sent through confstrinterp() to do string replacement
703  on forecast and analysis times, and other variables
704  @param category if specified, the product category. By default,
705  this task's taskname is used."""
706  category=category if(category is not None) else self.taskname
707  sloc=self.confstrinterp(location)
708  prod=RawATCFProduct(dstore=self.dstore,
709  prodname=prodname,category=category,location=sloc)
710  prod.location=sloc
711  self._deliveries.append((None, prod))
712  self._products[prodname]=prod
713  return prod
714  def product(self,name):
715  """!Returns the tracker Product with the specified product name
716  or raises KeyError if none is found.
717  @param name the product name"""
718  return self._products[name]
719  def products(self,name=None):
720  """!Iterates over all tracker Products.
721  @param name only iterate over this named product"""
722  if name is not None and name in self._products:
723  yield self._products[name]
724  else:
725  for (n,p) in self._products.iteritems(): yield p
726  def send_atcfunix_subset(self,prodname,location,fhr=None,category=None,freq=1,cut=None):
727  """!Requests delivery of a cleaned subset of the tracker
728  atcfunix file, only up to the specified forecast hour. This
729  creates a new CleanATCFSubsetProduct with the specified
730  product name and category. If the category is unspecified or
731  None, then this class's taskname will be used. Note that the
732  taskname MUST be used if running in realtime=True mode.
733  @param prodname the name of the product
734  @param location the delivery location, which will be sent
735  through confstrinterp() to perform string replacement
736  @param fhr the last forecast hour
737  @param category the product category to use instead of self.taskname
738  @param freq the output ATCF frequency in hours
739  @param cut the last character to include in each line. For example,
740  112 is the RMW"""
741  if fhr is None:
742  fhr=int(to_fraction(self._end-self._start)/3600.0)
743  assert(fhr>0)
744  elif isinstance(fhr,datetime.timedelta):
745  fhr=math.ceil(to_fraction(fhr)/3600.0)
746  else:
747  fhr=int(math.ceil(fhr))
748  sys.stderr.write('FHR='+repr(fhr)+'\n')
749  assert(fhr>=0 and fhr<=126) # FIXME: REMOVE
750  category=category if(category is not None) else self.taskname
751  sloc=self.confstrinterp(location)
752  assert(fhr is not None)
753  prod=CleanATCFSubsetProduct(dstore=self.dstore,
754  prodname=prodname,category=category,location=sloc)
755  prod['subset_fhr']=str(fhr)
756  prod['subset_freq']=str(freq)
757  if cut is not None:
758  prod['line_cut']=str(cut)
759  else:
760  prod['line_cut']='9999'
761  prod.location=sloc
762  assert('subset_fhr' in prod)
763  assert(prod['subset_fhr'] is not None)
764  assert(prod['subset_fhr'] != '')
765  #assert(float(prod['subset_fhr'])>=12)
766  self._deliveries.append((fhr, prod))
767  self._products[prodname]=prod
768  return prod
769  def send_atcfunix(self,prodname,location,category=None):
770  """!Requests delivery of a cleaned-up version of the full
771  length atcfunix file. This is the same as calling
772  send_atcfunix_subset with the full simulation length as the
773  end time. The prodname will be the Product's name, and the
774  location is the delivery location. If specified and not None,
775  the category will be the Product's category, otherwise it will
776  be this Task's taskname. Note that if realtime=True is
777  enabled, the category must be the taskname.
778  @param prodname the new product's name
779  @param location the delivery location
780  @param category the product category instead of self.taskname"""
781  return self.send_atcfunix_subset(prodname,location,
782  self._end-self._start,category=category)
783  def getwrite_vit(self):
784  """!Flag: output genesis vitals?"""
785  return self._flags['write_vit']
786 
787  ##@property write_vit
788  # Output genesis vitals? Read-only property.
789  write_vit=property(getwrite_vit,None,None,'Flag: output genesis vitals?')
790  def getphaseflag(self):
791  """!Flag: output phase info?"""
792  return self._flags['phaseflag']
793  ##@property phaseflag
794  # Output phase info? Read-only property.
795  phaseflag=property(getphaseflag,None,None,'Flag: output phase info?')
796  def getstructflag(self):
797  """!Flag: output structure info?"""
798  return self._flags['structflag']
799  ##@property structflag
800  # Output storm structure information? Read-only property.
801  structflag=property(getstructflag,None,None,
802  'Flag: output storm structure information?')
803  def getikeflag(self):
804  """!Flag: output IKE info?"""
805  return self._flags['ikeflag']
806 
807  ##@property ikeflag
808  # Output storm IKE information? Read-only property.
809  ikeflag=property(getikeflag,None,None,
810  'Flag: output storm IKE information?')
811  def getrealtime(self):
812  """!Flag: run in real-time mode?"""
813  return self._flags['realtime']
814 
815  ##@property realtime
816  # Run in real-time mode? Read-only property.
817  realtime=property(getrealtime,None,None,'Flag: run in real-time mode?')
818 
819  def grab_gribs(self,gribtask,gribname):
820  """!Loops through the given gribtask, getting all products with
821  specified name (gribname) for the times requested in the
822  TrackerBase constructor. Will raise MissingGRIBError if any
823  times have no product. Returns a TimeArray of Product
824  objects.
825  @param gribtask the hwrf.gribtask.GRIBTask to query
826  @param gribname the grib product name"""
827  gribs=TimeArray(self._start,self._end,self._step)
828  dt=to_fraction(self._step)/10
829  for rtime in gribs.times():
830  for ptime,product in gribtask.products(name=gribname,time=rtime,
831  yieldtime=True):
832  timediff=abs(to_fraction(ptime-rtime,negok=True))
833  self.log().debug('%s timediff %s at time %s'%
834  (gribname,str(timediff),rtime.strftime(
835  '%Y%m%d.%H%M%S')))
836  if timediff>dt:
837  self.log().debug('%s: no %s at time %s'%
838  ( gribtask.taskname,gribname,
839  rtime.strftime('%Y%m%d.%H%M%S') ))
840  continue
841  else:
842  self.log().debug('%s: found %s at time %s'%
843  ( gribtask.taskname,gribname,
844  rtime.strftime('%Y%m%d.%H%M%S') ))
845  gribs[rtime]=product
846  break
847  return gribs
848 
849  def add_moving_grid(self,tcvitals,gribtask,gribname):
850  """!Tells the tracker to run in moving grid mode, for the
851  specified storm.
852 
853  @param tcvitals The tcvitals must be an hwrf.storminfo.StormInfo.
854  @param gribtask The gribtask must be an hwrf.gribtask.GRIBTask.
855  @param gribname The gribname is the name in the
856  hwrf.regrib.RegribMany of the operation that produces the
857  tracker input for this moving storm."""
858  if self._moving is False:
859  raise TrackerModeError(
860  'Cannot mix stationary regional and moving grids in one '
861  'tracker invocation.')
862  if self._moving is True:
863  raise TrackerModeError(
864  'Cannot have multiple moving grids: the current tracker '
865  'implementation does not support that. Use multiple '
866  'TrackerTask objects instead.')
867  if self._storminfo is None: self._storminfo=tcvitals
868  self._moving=True
869  gribs=self.grab_gribs(gribtask,gribname)
870  stid=tcvitals.longstormid
871  if stid in self._grids or stid in self._vitals:
872  raise TrackerStormError(
873  '%s: More than one of this storm specified.'%(stid,))
874  try:
875  self._grids[stid]=gribs
876  self._vitals[stid]=tcvitals
877  except Exception:
878  if stid in self._grids: del self._grids[stid]
879  if stid in self._vitals: del self._vitals[stid]
880  raise
881 
882  def add_regional_grid(self,gribtask,gribname):
883  """!Adds a stationary grid on which to run the tracker.
884  @param gribtask The gribtask must be an hwrf.gribtask.GRIBTask
885  to create the input.
886  @param gribname The gribname is the operation name in the grib task's
887  hwrf.regrib.RegribMany for the GRIB data source."""
888  if self._moving is True:
889  raise TrackerModeError(
890  'Cannot mix stationary regional and moving grids in one '
891  'tracker invocation.')
892  self._moving=False
893  gribs=self.grab_gribs(gribtask,gribname)
894  self._grids[self.NOVITALS]=gribs
895 
896  def add_storm(self,tcvitals):
897  """!Specifies the tcvitals for a storm to track.
898  @param tcvitals the hwrf.storminfo.StormInfo to use"""
899  if self._moving is None:
900  raise TrackerModeError(
901  'Specify tracker mode before adding storms. Call '
902  'add_moving_grid or add_regional_grid before add_storm')
903  if self._moving is True:
904  raise TrackerModeError(
905  'In moving grid mode, storms must have associated grid '
906  'data. Use add_moving_grid instead of add_storm.')
907  self._vitals[tcvitals.longstormid]=tcvitals
908 
909  def link_grib(self,inputlist):
910  """!Symbolically links all input GRIB1/2 files to the current
911  working directory using the file naming convention expected by
912  the tracker. If self.realtime=False, then will also check for
913  the files' existence and will raise MissingGRIBError if one is
914  missing. If self.realtime=True, then it is assumed the
915  tracker is being run in waitfor mode and will wait for the
916  data on its own. In either case, GRIBLocationError will be
917  raised if any GRIB's Product.location is None or '' (the
918  location must be known to make the symlink).
919  @param inputlist a file to create with the list of tracker
920  input files"""
921  logger=self.log()
922  for stid in self._grids.keys():
923  strstid=str(stid)
924  logger.info('process storm %s'%(strstid,))
925  if stid in self._vitals:
926  longstormid=str(self._vitals[stid].longstormid)
927  else:
928  longstormid='regional'
929  grids=self._grids[stid]
930  atcfymdh=int(self._nml.nl_get('atcfinfo','atcfymdh'))
931  linkstart='%s.%s.%s.%10d.f' % \
932  (self._gmodname,self._rundescr,longstormid,atcfymdh)
933  for (ftime,grib) in grids.iteritems():
934  logger.info('storm %s time %s'%(strstid,str(ftime)))
935  if not self.realtime:
936  while not grib.check():
937  logger.info('%s: not yet available. Sleeping.')
938  time.sleep(20)
939  loc=grib.location
940  locix=grib.grib1grbindex
941  if loc is None or loc=='':
942  raise GRIBLocationError(
943  '%s: no location specified for this GRIB product.'
944  %(grib.did,))
945  minutes=int(to_fraction(ftime-self._start)/60)
946  produtil.fileop.make_symlink(loc,'%s%05.5d'%(
947  linkstart,minutes),logger=logger)
948  if locix is None or locix=='':
949  logger.info(
950  '%s: no grbindex file location specified for this '
951  'GRIB product. Assuming .grbindex extension of '
952  'the GRIB file.'%(grib.did,))
953  locix='%s.grbindex'%(loc,)
955  locix,'%s%05.5d.ix'%(linkstart,minutes),logger=logger)
956  with open(inputlist,'wt') as f:
957  i=0
958  for ftime in self._grids[self._grids.keys()[0]].datatimes():
959  i+=1
960  minute=int(to_fraction(ftime-self._start)/60)
961  f.write("%4d %5d\n"%(i,minute))
962 
963  def concat_grib(self,filename,ixfilename):
964  """!Concats all input GRIB1/2 files into a single file, waiting
965  for each one to become available first. Produces the grbindex
966  output for that file at the end. Only works in regional grid
967  mode, and will raise TrackerModeError if the mode is unknown
968  or moving grid.
969 
970  @note This is routine is untested because it is a bad idea.
971  It results in duplication of an enormous amount of data.
972  However, historically, this is how the tracker used to be
973  run.
974  @param filename the input GRIB1 file name
975  @param ixfilename the output index file name"""
976  with open(filename,'wb') as outf:
977  outf.truncate(0)
978  for product in self._gribs:
979  while not product.check():
980  self.log().info('%s: not yet available. Sleeping...'
981  %(product.did))
982  time.sleep(20)
983  with open(filename,'ab') as outf:
984  with open(str(product.location()),'rb') as inf:
985  shutil.copyfileobj(inf,outf)
986  grbindex=alias(bigexe(self.getexe('grbindex')))
987  if self.confbool('sync_frequently',True):
988  runsync()
989  checkrun(grbindex['filename','ixfilename'])
990 
991  def make_namelist(self,filename):
992  """!Construct the tracker namelist and write it to the
993  specified file. The "realtime" flag is True when running in
994  real-time mode, and False if not.
995  @param filename the output namelist file's name"""
996  realtime=bool(self.realtime)
997 
998  # Make some aliases to shorten the below code:
999  nml=self._nml # the Conf2Namelist object with the namelist
1000  tm=self._start # the datetime with the start time
1001  ns=nml.nl_set # force namelist to have certain values
1002  nlsiu=nml.nl_set_if_unset # provide default values if missing
1003 
1004  ns('datein','inp%bcc',int(tm.year)/100)
1005  ns('datein','inp%byy',int(tm.year)%100)
1006  ns('datein','inp%bmm',int(tm.month))
1007  ns('datein','inp%bdd',int(tm.day))
1008  ns('datein','inp%bhh',int(tm.hour))
1009  nlsiu('datein','inp%modtyp','regional')
1010  nlsiu('datein','inp%lt_units','hours')
1011  ns('datein','inp%file_seq','multi')
1012  ns('datein','inp%nesttyp',
1013  'moveable' if(self._moving) else 'stopped')
1014  nlsiu('atcfinfo','atcfnum',81)
1015  nlsiu('atcfinfo','atcfname','TEST')
1016 
1017  hours=int(to_fraction(self._step)/3600)
1018  minutes=int(to_fraction(self._step)/60-60*hours)
1019  ns('atcfinfo','atcffreq',hours*100+minutes)
1020  nlsiu('trackerinfo','trkrinfo%type','tracker')
1021  nlsiu('trackerinfo','trkrinfo%mslpthresh',0.0015)
1022  nlsiu('trackerinfo','trkrinfo%v850thresh',1.5000)
1023  nlsiu('trackerinfo','trkrinfo%gridtype','regional')
1024  nlsiu('trackerinfo','trkrinfo%contint',100.0)
1025  nlsiu('trackerinfo','trkrinfo%out_vit','y' if(self.write_vit) else 'n')
1026  nlsiu('trackerinfo','trkrinfo%gribver',1)
1027  nlsiu('trackerinfo','trkrinfo%g2_jpdtn',0)
1028  nlsiu('phaseinfo','phaseflag','y' if(self.phaseflag) else 'n')
1029  nlsiu('phaseinfo','phasescheme','both')
1030  nlsiu('phaseinfo','wcore_depth',1.0)
1031  nlsiu('structinfo','structflag','y' if (self.structflag) else 'n')
1032  nlsiu('structinfo','ikeflag','y' if(self.ikeflag) else 'n')
1033 
1034  for stid in self._grids.keys():
1035  if stid in self._vitals:
1036  longstormid=str(self._vitals[stid].longstormid)
1037  else:
1038  longstormid='regional'
1039  break
1040  ns('fnameinfo','atcfdescr',longstormid)
1041  nlsiu('verbose','verb',3)
1042  ns('waitinfo','use_waitfor', ('y' if(realtime) else 'n') )
1043  nlsiu('waitinfo','wait_min_age',10)
1044  nlsiu('waitinfo','wait_min_size',100)
1045  nlsiu('waitinfo','wait_max_wait',1800)
1046  nlsiu('waitinfo','wait_sleeptime',5)
1047 
1048  if self.realtime:
1049  ns('waitinfo','use_per_fcst_command','y')
1050  ns('waitinfo','per_fcst_command',"./deliver %[FHOUR] %[FMIN]")
1051  else:
1052  ns('waitinfo','use_per_fcst_command','n')
1053 
1054  # Intel Fortran is fussy about the order of the namelists and
1055  # the order of the values in the namelist, so we order them
1056  # here:
1057  order=['datein','atcfinfo','trackerinfo','phaseinfo','structinfo',
1058  'fnameinfo','verbose','waitinfo']
1059  Odatein=['inp%bcc','inp%byy','inp%bmm','inp%bdd','inp%bhh',
1060  'inp%model','inp%modtyp','inp%lt_units','inp%file_seq',
1061  'inp%nesttyp']
1062  Oatcfinfo=['atcfnum','atcfname','atcfymdh','atcffreq']
1063  Otrackerinfo=['trkrinfo%westbd','trkrinfo%eastbd',
1064  'trkrinfo%northbd','trkrinfo%southbd',
1065  'trkrinfo%type','trkrinfo%mslpthresh',
1066  'trkrinfo%v850thresh','trkrinfo%gridtype',
1067  'trkrinfo%contint','trkrinfo%out_vit']
1068  Ophaseinfo=['phaseflag','phasescheme','wcore_depth']
1069  Ostructinfo=['structflag','ikeflag']
1070  Ofnameinfo=['gmodname','rundescr','atcfdescr']
1071  Owaitinfo=['use_waitfor','wait_min_age','wait_min_size',
1072  'wait_sleeptime','use_per_fcst_command',
1073  'per_fcst_command']
1074  varorder={ 'datein':partial_ordering(Odatein),
1075  'atcfinfo':partial_ordering(Oatcfinfo),
1076  'trackerinfo':partial_ordering(Otrackerinfo),
1077  'phaseinfo':partial_ordering(Ophaseinfo),
1078  'structinfo':partial_ordering(Ostructinfo),
1079  'fnameinfo':partial_ordering(Ofnameinfo),
1080  'verbose':partial_ordering(['verb']),
1081  'waitinfo':partial_ordering(Owaitinfo) }
1082  with open(str(filename),'wt') as f:
1083  f.write(nml.make_namelist(section_sorter=partial_ordering(order),
1084  var_sorters=varorder))
1085 
1086  def run(self):
1087  """!Runs the tracker and delivers the resulting track files."""
1088  logger=self.log()
1089  gettrk=alias(bigexe(self.getexe('gettrk')))
1090  realtime=bool(self.realtime)
1091  # Update the database to say that no tracks are delivered
1092  for (f,p) in self._deliveries:
1093  p.available=False
1094  wd=self.workdir
1095  assert(wd)
1096  if os.path.exists(wd):
1097  shutil.rmtree(wd)
1098  with NamedDir(wd):
1099  # Link all input GRIB files and create the input list file:
1100  logger.info('link tracker input')
1101  self.link_grib('input.fcsttime')
1102 
1103  with open('deliver','wt') as f:
1104  f.write(tracker_delivery_script.format(
1105  dstorepath=self.dstore.filename,
1106  taskname=self.taskname))
1107  ofstat=os.fstat(f.fileno())
1108  os.fchmod(f.fileno(),ofstat.st_mode|755)
1109 
1110  # Create the tcvitals input file:
1111  with open('input.vitals','wt') as vitf:
1112  for stid,vital in self._vitals.iteritems():
1113  vitf.write(vital.as_tcvitals())
1114 
1115  # Four files must exist and be empty, so we create them here:
1116  # 13, 14: used for genesis vitals, unused here
1117  # 11, 31: used for single combined input GRIB, unused here
1118  for empty in [13,14,11,31]:
1119  with open('fort.%d'%(empty,),'wb') as f:
1120  f.truncate(0)
1121 
1122  # Make the tracker namelist:
1123  logger.info('make tracker namelist')
1124  self.make_namelist('input.nml')
1125  assert(isnonempty('input.nml'))
1126 
1127  # Prefix for all output files:
1128  pre='output'
1129 
1130  # Generate the list of fort.* files to link:
1131  linkme={
1132  15:'input.fcsttime',
1133  12:'input.vitals',
1134  61:'%s.all'%(pre,),
1135  62:'%s.atcf'%(pre,),
1136  63:'%s.radii'%(pre,),
1137  64:'%s.atcfunix'%(pre,),
1138  66:'%s.atcf_gen'%(pre,),
1139  68:'%s.atcf_sink'%(pre,),
1140  69:'%s.atcf_hfip'%(pre,),
1141  }
1142 
1143  if self.write_vit: linkme[67]='%s.genvitals'%(pre,)
1144  if self.phaseflag: linkme[71]='%s.cps_parms'%(pre,)
1145  if self.ikeflag: linkme[74]='%s.ike'%(pre,)
1146 
1147  if self.structflag:
1148  linkme[72]='%s.structure'%(pre,)
1149  linkme[73]='%s.fractwind'%(pre,)
1150  linkme[76]='%s.pdfwind'%(pre,)
1151 
1152  for forti in sorted(linkme.keys()):
1153  logger.info('Link: fort.%d = %s'%(forti,linkme[forti]))
1154 
1155  # Link fort.* files:
1156  produtil.fileop.fortlink(linkme,force=True)
1157 
1158  # Run the tracker:
1159  self.postmsg('Starting gettrk.')
1160  cmd=(gettrk < './input.nml' )
1161  if self.redirect:
1162  cmd = cmd >= 'tracker.log'
1163  logger.warning(repr(cmd))
1164  try:
1165  if self.confbool('sync_frequently',True):
1166  runsync()
1167  checkrun(cmd)
1168  self.postmsg('Successful return status from gettrk.')
1169  except(Exception) as e:
1170  logger.critical('GETTRK FAILED: '+str(e),exc_info=True)
1171  raise
1172 
1173  # Copy tracker log file to outdir:
1174  if self.redirect:
1175  tgt=os.path.join(self.outdir,'tracker.log')
1176  deliver_file('tracker.log',tgt,keep=True,logger=logger)
1177 
1179 
1180  def call_completed_callbacks(self,logger=None):
1181  """!Calls callbacks for all completed products.
1182  @param logger a logging.Logger for log messages"""
1183  if logger is None: logger=self.log()
1184  # Run callbacks for all delivered products:
1185  for prod in self.products():
1186  prod.update() # invalidate cache, recheck DB
1187  if prod.available:
1188  if not prod.has_callbacks():
1189  logger.warning('%s: rejoice: completed!! (no '
1190  'callbacks)'%(prod.did,))
1191  else:
1192  logger.warning('%s: rejoice: completed!! (has '
1193  'callbacks)'%(prod.did,))
1194  prod.call_callbacks()
1195  else:
1196  logger.error('%s: tracker product is not complete.'
1197  %(prod.did,))
def send_raw_atcfunix
Requests delivery of the raw, unmodified atcfunix file to the specified location. ...
Definition: tracker.py:693
Change directory, handle temporary directories.
Definition: cd.py:1
ikeflag
Output storm IKE information? Read-only property.
Definition: tracker.py:809
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def hwrf_global_subset(w)
A simple wrapper around hwrf_diagnostic_subset that provides a subset of the HWRF fields and vertical...
Definition: tracker.py:117
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
Definition: hwrftask.py:319
Generates a Fortran namelist entirely from config files.
Definition: namelist.py:411
def jtwc_rewrite
Rewrites track files as used by the HWRF WPAC parallels from 2013-2015.
Definition: tracker.py:26
def add_storm(self, tcvitals)
Specifies the tcvitals for a storm to track.
Definition: tracker.py:896
This Task is used by the delivery script executed by the gettrk program itself.
Definition: tracker.py:519
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
def concat_grib(self, filename, ixfilename)
Concats all input GRIB1/2 files into a single file, waiting for each one to become available first...
Definition: tracker.py:963
def redirect(self)
Should subprograms' outputs be redirected to separate files?
Definition: hwrftask.py:190
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
A subclass of Product that represents file delivery.
Definition: datastore.py:856
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
def __init__(self, op)
GRIB1VintTave constructor.
Definition: tracker.py:246
def getphaseflag(self)
Flag: output phase info?
Definition: tracker.py:790
A FileProduct that delivers a subset of a track.
Definition: tracker.py:415
This task runs the GFDL Vortex Tracker on HWRF output.
Definition: tracker.py:649
conf
This HWRFTask's hwrf.config.HWRFConfig object.
Definition: hwrftask.py:415
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
def product(self, name)
Returns the tracker Product with the specified product name or raises KeyError if none is found...
Definition: tracker.py:714
This subclass of GRIBOp produces GRIB1 files.
Definition: regrib.py:1552
def action_vinttave(self, regrib, ingrib, time=None, task=None, blocksize=1048576, atime=None, kwargs)
Performs the actual work of GRIB1VintTave.make: copying input, running vint and tave, copying output.
Definition: tracker.py:262
def deliver
Reads the tracker output, discarding everything after the specified forecast hour.
Definition: tracker.py:429
def deliver
Copies the tracker output to its destination without modification.
Definition: tracker.py:399
def send_atcfunix
Requests delivery of a cleaned-up version of the full length atcfunix file.
Definition: tracker.py:769
def add_regional_grid(self, gribtask, gribname)
Adds a stationary grid on which to run the tracker.
Definition: tracker.py:882
def action_expandlatlon(op, regrib, name, center, west, east, north, south, n0, n1, scan, res0, res1, kwargs)
Internal function that expands a lat-lon grid to make a bigger one.
Definition: tracker.py:166
def hwrf_combine_subset(w)
A simple wrapper around hwrf_diagnostic_subset that provides a subset of the HWRF fields and vertical...
Definition: tracker.py:109
Represents a process or actor that makes a Product.
Definition: datastore.py:1052
def getstructflag(self)
Flag: output structure info?
Definition: tracker.py:796
realtime
Run in real-time mode? Read-only property.
Definition: tracker.py:817
def log(self)
Returns the logger object for this task.
Definition: datastore.py:1149
def send_atcfunix_subset
Requests delivery of a cleaned subset of the tracker atcfunix file, only up to the specified forecast...
Definition: tracker.py:726
def confbool
Alias for self.conf.getbool for section self.section.
Definition: hwrftask.py:287
section
The confsection in self.section for this HWRFTask (read-only)
Definition: hwrftask.py:422
def tracker_subset(wgribout)
This is a GRIB subsetter intended to be sent to the subset operator of an hwrf.regrib.GRIB1Op object, to produce GRIB1 files that contain only fields needed by the tracker.
Definition: tracker.py:141
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
def args(self)
Iterates over all child GRIBBase objects.
Definition: regrib.py:1478
fmin
Last expected forecast minute.
Definition: tracker.py:540
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
structflag
Output storm structure information? Read-only property.
Definition: tracker.py:801
def get
Alias for self.meta() Returns the value of the specified metadata key or returns default if it is uns...
Definition: datastore.py:637
fhr
Last expected forecast hour.
Definition: tracker.py:539
This class represents a GRIBGrid that must be computed from some other input, usually a GRIB file...
Definition: regrib.py:1249
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Definition: fileop.py:333
def expandlatlon
Creates a GRID suitable for use with the hwrf.regrib module.
Definition: tracker.py:216
def getrealtime(self)
Flag: run in real-time mode?
Definition: tracker.py:811
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
location
Read-write property, an alias for getlocation() and setlocation().
Definition: datastore.py:563
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
Time manipulation and other numerical routines.
Definition: numerics.py:1
Raised when an impossible tracker configuration is requested, such as running with a grid that is bot...
Definition: exceptions.py:155
def run(self)
Performs the actual delivery.
Definition: tracker.py:547
write_vit
Output genesis vitals? Read-only property.
Definition: tracker.py:789
def fortlink
This is a convenience routine that makes many symbolic links to fort.N files for various integers N u...
Definition: fileop.py:834
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def call_completed_callbacks
Calls callbacks for all completed products.
Definition: tracker.py:1180
def getwrite_vit(self)
Flag: output genesis vitals?
Definition: tracker.py:783
string NOVITALS
Constant used to represent the lack of vitals data.
Definition: tracker.py:657
def make(self, regrib, kwargs)
This runs the actual vint and tave programs on the GRIB1 file that comes from the argument provided t...
Definition: tracker.py:250
def log
Obtain a logging domain.
Definition: hwrftask.py:425
def run(self)
Runs the tracker and delivers the resulting track files.
Definition: tracker.py:1086
This subclass of GRIB1Op represents a GRIB1 file on disk that is ALREADY PRESENT. ...
Definition: regrib.py:1667
def __init__(self, dstore, taskname, fhr, fmin, logger=None, kwargs)
Creates a TrackerView for the specified datastore and taskname.
Definition: tracker.py:525
A FileProduct for tracker output.
Definition: tracker.py:393
A time-indexed array that can only handle equally spaced times.
Definition: numerics.py:689
This class represents a GRIB1 or GRIB2 grid specification.
Definition: regrib.py:1151
def getikeflag(self)
Flag: output IKE info?
Definition: tracker.py:803
def __init__(self, kwargs)
Create a CleanATCFSubsetProduct that will deliver a track with the specified last allowable forecast ...
Definition: tracker.py:421
def add_moving_grid(self, tcvitals, gribtask, gribname)
Tells the tracker to run in moving grid mode, for the specified storm.
Definition: tracker.py:849
def products
Iterates over all tracker Products.
Definition: tracker.py:719
did
Read-only property, an alias for getid().
Definition: datastore.py:551
def process_line(self, tracker, lastfhr, freq, cut, line)
Processes one line of the track file.
Definition: tracker.py:472
def grab_gribs(self, gribtask, gribname)
Loops through the given gribtask, getting all products with specified name (gribname) for the times r...
Definition: tracker.py:819
def make_namelist(self, filename)
Construct the tracker namelist and write it to the specified file.
Definition: tracker.py:991
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
Definition: datastore.py:1084
Describes regribbing operations using an algebraic structure.
Definition: regrib.py:1
def call_callbacks
Calls all delivery callback functions.
Definition: datastore.py:759
def has_callbacks(self)
Returns True if this Product has any callback functions and False otherwise.
Definition: datastore.py:754
phaseflag
Output phase info? Read-only property.
Definition: tracker.py:795
Raised when multiple storms are requested, but only one was expected.
Definition: exceptions.py:158
def link_grib(self, inputlist)
Symbolically links all input GRIB1/2 files to the current working directory using the file naming con...
Definition: tracker.py:909
A GRIB1Op that runs vint and tave: two of the tracker prep programs.
Definition: tracker.py:241
def hwrf_diagnostic_subset
Selects lines for the hwrfprs_c files.
Definition: tracker.py:56
def __init__(self, ds, conf, section, start, end, step, write_vit=True, phaseflag=True, structflag=False, ikeflag=False, realtime=True, kwargs)
Creates a new TrackerTask.
Definition: tracker.py:660
def make_symlink
Creates a symbolic link "target" that points to "source".
Definition: fileop.py:677