HWRF  trunk@4391
ensda.py
1 """!Utilities for ensemble-based data assimilation.
2 
3 This module contains utilities for doing various forms of
4 ensemble-based data assimilation. It manages a two-dimension
5 ensemble-time array of Task objects that know how to run each analysis
6 cycle for each member. This module also provides a way of presenting
7 the previous forecast cycle's ensda objects such that this cycle can
8 use them. This is all built on top of the classes in the
9 hwrf.fcsttask module."""
10 
11 ##@var __all__
12 # The symbols exported by "from hwrf.ensda import *"
13 __all__=['DAEnsemble','FromPriorCycle','FromGFSENKF',
14  'write_ensda_flag_file','read_ensda_flag_file',
15  'CycleTDRCheck','AlwaysRunENSDA','enada_pre_object_for']
16 
17 import os
19 
22 
23 from produtil.log import jlogger
24 from produtil.datastore import COMPLETED,UpstreamFile
25 from produtil.fileop import isnonempty, wait_for_files, make_symlink
26 from hwrf.numerics import to_datetime_rel, to_timedelta, TimeArray, to_datetime
27 from hwrf.post import PostManyWRF
28 from produtil.cd import NamedDir
29 from hwrf.wrf import WRFDomain
30 from produtil.run import alias, exe
31 from hwrf.regrib import clatlon
32 
34  """!Represents a two-dimensional ensemble-vs-time array of
35  hwrf.hwrftask.HWRFTask objects."""
36  def __init__(self,dstore,conf,section,anlintime=None,
37  taskname=None,**kwargs):
38  """!Constructor for DAEnsemble
39  @param dstore the produtil.datastore.Datastore database to use
40  @param conf the hwrf.config.HWRFConfig for configuration info
41  @param section the section to use in conf
42  @param anlintime the analysis input time
43  @param taskname the name of this task in the database
44  @param kwargs passed to the superclass constructor"""
45  super(DAEnsemble,self).__init__(dstore,conf,section,taskname,
46  **kwargs)
47 
48  if anlintime is None:
49  anlintime=conf.cycle
50  anlintime=to_datetime_rel(anlintime,conf.cycle)
51  self.__anlintime=anlintime
52  self.__memberids=set()
53 
54  cycling_interval=conf.getfloat('config','cycling_interval')*3600.0
55 
56  self.__tstep=to_timedelta(self.confstr('da_cycle',cycling_interval))
57  assert(self.__tstep>to_timedelta(1800))
58  endtime=to_datetime_rel(cycling_interval,conf.cycle)
59 
60  self.__members=TimeArray(conf.cycle,endtime-self.__tstep,
61  self.__tstep,dict)
62  self.__anlouttime=self.__members.lasttime
63 
64  @property
65  def anlintime(self):
66  """!The time at the beginning of the first ensemble step."""
67  return self.__anlintime
68 
69  @property
70  def anlouttime(self):
71  """!The time at the end of the last ensemble step."""
72  return self.__anlouttime
73 
74  @property
75  def nmembers(self):
76  """!The number of members of the ensemble."""
77  return len(self.__memberids)
78 
79  @property
80  def nsteps(self):
81  """!The number of ensemble DA time steps."""
82  return len(self.__members)
83 
84  @property
85  def anlintimes(self):
86  """!Iterates over all ensemble analysis input times."""
87  for t in self.__members.times():
88  yield t
89 
90  @property
91  def anlouttimes(self):
92  """!Iterates over all ensemble analysis output times."""
93  first=True
94  for t in self.__members.times():
95  if first:
96  first=False
97  else:
98  yield t
99  yield self.__anlouttime
100 
101  def member_ids(self):
102  """!Iterates over all member ids."""
103  for memberid in self.__memberids:
104  yield memberid
105 
106  def set_member(self,atime,enkfmem,task):
107  """!sets the HWRFTask to use to use for one cycle of one member
108 
109  Tells member enkfmem to use the specified task to produce
110  output whose input analysis time is atime.
111  @param atime the analysis time, a datetime.datetime
112  @param enkfmem the enkf member id
113  @param task the HWRFTask to use"""
114  self.__memberids.add(enkfmem)
115  self.__members[atime][enkfmem]=task
116 
117  def members_at_time(self,atime):
118  """!iterate over members for a specified analysis input time
119 
120  Iterates over all members at the specified anlintime,
121  yielding (id,member) tuples.
122  @param atime the analysis time, a datetime.datetime"""
123  time=to_datetime_rel(atime,self.__anlintime)
124  for (enkfmem,memstep) in self.__members[time].iteritems():
125  yield (enkfmem,memstep)
126 
128  """!iterate over members at the final analysis output time.
129 
130  Iterates over all members at the final analysis output time,
131  yielding (id,member) tuples."""
132  for (e,m) in self.members_at_time(self.__members.lasttime):
133  yield e,m
134 
135  def steps_for_member(self,enkfmem):
136  """!iterate over analysis cycles for a specific member
137 
138  Iterates over (time,EnsembleDAMemberStep) pairs for the
139  specified member."""
140  for (t,a) in self.__members.iteritems():
141  yield t,a[enkfmem]
142 
143  def member(self,atime,enkfmem):
144  """!get the analysis cycle for the specified member and time
145 
146  Returns the da cycle for member enkfmem at analysis input
147  time atime.
148  @param atime the analysis time, a datetime.datetime
149  @param enkfmem the ensemble id"""
150  return self.__members[atime][enkfmem]
151 
152  def inputiter(self):
153  """!iterate over all needed input data, for hwrf.input
154 
155  Calls inputiter for all steps of all ENKF members. This is
156  for use by the hwrf.input to figure out what input data is
157  required for the DA ensemble."""
158  for t,e in self.__members.iteritems():
159  for m in e.itervalues():
160  for d in m.inputiter():
161  yield d
162 
163  def dump(self):
164  """!print detailed diagnostics
165 
166  Sends detailed diagnostics about all members to the print()
167  statement. This is intended for debugging only."""
168  for (time,stuff) in self.__members.iteritems():
169  for (enkfid,memberstep) in stuff.iteritems():
170  print "self.__members[%s][%s]=%s"%(
171  repr(time),repr(enkfid),repr(memberstep))
172  t=self.__members.lasttime
173  print 'last time t is %s'%(repr(t),)
174  for (enkfid,memberstep) in self.members_at_time(t):
175  print 'self.__members[t][%s]=%s'%(
176  repr(enkfid),repr(memberstep))
177 
178 ########################################################################
179 
181  """!Represents an ensemble member from the previous forecast cycle.
182 
183  This is used to generate UpstreamFile objects for the previous
184  cycle's ensemble of FromGFSENKF simulations."""
185  def __init__(self,dstore,conf,section,domains,enkfmem,anlouttime,
186  **kwargs):
187  """!FromPriorCycle constructor
188 
189  Makes a new FromPriorCycle object.
190  @param dstore the produtil.datastore.Datastore database object
191  @param conf the hwrf.config.HWRFConfig with configuration info
192  @param section the section to use within conf
193  @param domains the list or tuple of hwrf.wrf.WRFDomain objects
194  to obtain from the prior cycle, in order of
195  grid ID.
196  @param enkfmem the enkf member ID
197  @param anlouttime the output time at the end of the analysis
198  @param kwargs passed to the superclass constructor"""
199  super(FromPriorCycle,self).__init__(dstore,conf,section,**kwargs)
200  self.__domains=[ d for d in domains ]
201  self.__enkfmem=int(enkfmem)
202  self.__anlouttime=hwrf.numerics.to_datetime(anlouttime)
203 
204  def products(self,domains=None,**kwargs):
205  """!Iterates over all products
206 
207  Iterates over products produced by the prior forecast cycle's
208  analysis cycle.
209  @param domains if provided and non-None, only products from these
210  domains are yielded
211  @param kwargs ignored """
212  if False: yield None # ensures this is an iterator
213  if domains is None: domains=self.__domains
214  assert(domains)
215  for domain in domains:
216  assert(domain in self.__domains)
217  if domain in self.__domains:
218  if domain.is_moad():
219  yield self.get_wrfinput(domain,self.__anlouttime)
220  else:
221  yield self.get_wrfanl(domain,self.__anlouttime)
222  def get_wrfinput(self,domain,atime):
223  """!return the product for the specified domain and analysis time
224 
225  Returns the product for the wrfinput_d01 file for the
226  specified domain and time. This is simply a wrapper around
227  get_product(domain,atime)
228  @param domain the domain of interest
229  @param atime the analysis time as a datetime.datetime"""
230  return self.get_product(domain,atime)
231  def get_wrfanl(self,domain,atime):
232  """!returns the wrfanl file's product"""
233  return self.get_product(domain,atime)
234  def get_product(self,domain,atime):
235  """!Returns a product for the specified domain and time.
236 
237  Creates a new produtil.datastore.UpstreamFile for the wrfinput
238  or wrfanl file for the given domain, at the given analysis
239  time. The analysis time must be the anlouttime.
240  @return a newly created UpstreamFile for the file of interest
241  @param domain the domain of interest
242  @param atime the analysis time"""
243  logger=self.log()
244  atime=hwrf.numerics.to_datetime(atime)
245  if not atime==self.__anlouttime:
246  logger.info('Wrong atime: %s vs %s'
247  %(str(atime),str(self.__anlouttime)))
248  return None
249  if not domain in self.__domains:
250  logger.info('Invalid domain: %s not in %s'%(
251  str(domain), ', '.join([str(x) for x in self.__domains])))
252  return None
253  loc=self.confstrinterp(
254  '{oldcom}/{oldvit[stormnamelc]}{oldvit[stormid3lc]}.{oldvit[YMDH]}.'
255  'ensda_{enkfid:03d}.wrfinput_d{domid:02d}',enkfid=self.__enkfmem,
256  domid=int(domain.get_grid_id()))
257  logger.info('Domain %s atime %s enkfmem %s loc %s'%(
258  str(domain),str(atime),repr(self.__enkfmem),repr(loc)))
259  uf=UpstreamFile(self.dstore,category=self.taskname,
260  prodname=os.path.basename(loc),
261  location=loc)
262  uf.check()
263  return uf
264 
265  def get_track(self,atime):
266  logger=self.log()
267  atime=hwrf.numerics.to_datetime(atime)
268  if not atime==self.__anlouttime:
269  logger.info('Wrong atime: %s vs %s'
270  %(str(atime),str(self.__anlouttime)))
271  return None
272  enkfid='%03d'%int(self.__enkfmem)
273  loc=self.confstrinterp(
274  '{oldcom}/{oldvit[stormnamelc]}{oldvit[stormid3lc]}.{oldvit[YMDH]}.'
275  'trak.hwrf.atcfunix.mem'+enkfid)
276  logger.info('atime %s enkfmem %s loc %s'%(
277  str(atime),repr(self.__enkfmem),repr(loc)))
278  uf=UpstreamFile(self.dstore,category=self.taskname,
279  prodname=os.path.basename(loc),
280  location=loc)
281  uf.check()
282  return uf
283 
284 ########################################################################
285 
287  """! Forecast ensemble member based on the GFS ENKF.
288 
289  Runs one member of an ensemble DA forecast ensemble, using a
290  member of the GFS ENKF ensemble as initial and boundary
291  conditions. Some data from the earlier deterministic forecast
292  jobs is reused to simplify the process."""
293 
294  def __init__(self,dstore,conf,section,detinit,enkfmem,sim,
295  taskname=None,track=None,relocate=None,priorcycle=None,
296  **kwargs):
297 
298  """!Constructor for FromGFSENKF
299 
300  @param dstore the produtil.datastore.Datastore database to use
301  @param conf the hwrf.config.HWRFConfig that provides
302  configuration data
303  @param section the section in conf to use
304  @param detinit the deterministic initialization, an
305  hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI object.
306  @param enkfmem the ensemble member id
307  @param sim the hwrf.wrf.WRFSimulation object
308  @param taskname the name of the task within the database
309  @param kwargs passed to the parent class constructor"""
310  """!Constructor for FromGFSENKF
311 
312  @param dstore the produtil.datastore.Datastore database to use
313  @param conf the hwrf.config.HWRFConfig that provides
314  configuration data
315  @param section the section in conf to use
316  @param detinit the deterministic initialization, an
317  hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI object.
318  @param enkfmem the ensemble member id
319  @param sim the hwrf.wrf.WRFSimulation object
320  @param taskname the name of the task within the database
321  @param kwargs passed to the parent class constructor"""
322  super(FromGFSENKF,self).__init__(dstore,conf,section,taskname,
323  **kwargs)
324  assert(isinstance(sim,hwrf.wrf.WRFSimulation))
325  if track is None or relocate is None:
326  raise TypeError(
327  'You must explicitly specify the track and relocate '
328  'arguments to FromGFSENKF.__init__ and they must be boolean '
329  'values, not None.')
330  self.enkfmem=enkfmem
331  self.track=bool(track)
332  self.relocate=bool(relocate)
333  self.make_wrf(detinit,sim)
334  self.make_init(detinit)
335  self.make_fcst(detinit)
336  if self.track:
337  self._make_track()
338  if self.relocate:
339  self._make_relocate(track,'ENKF',priorcycle)
340 
341  ##@var enkfmem
342  # the enkf member id
343 
344  ##@var fcst
345  # the forecast task, an hwrf.fcsttask.AnalysisCycle
346 
347  ##@var geogrid
348  # the geogrid task from the deterministic initialization
349 
350  ##@var metgrid
351  # the metgrid task from the deterministic initialization
352 
353  ##@var prep
354  # the hwrf.prep.PrepHybrid that processes the GFS ENKF forecast
355  # and analysis spectral data
356 
357  ##@var realinit
358  # the hwrf.fcsttask.RealNMM that generates wrfinput_d01 and
359  # wrfbdy_d01 files for input to the fcst and wrfanl
360 
361  ##@var wrfanl
362  # the hwrf.fcsttask.WRFAnl that generates input wrfanl_d* files
363  # for input to the fcst
364 
365  @property
366  def anlintime(self):
367  """!The analysis input time."""
368  return self.__wrf.simstart()
369 
370  def make_wrf(self,detinit,sim):
371  """!Create the wrf() and fcst
372 
373  This function, called from the constructor, creates the
374  hwrf.wrf.WRFSimulation and hwrf.fcsttask.AnalysisCycle used
375  for this forecast.
376  @param detinit the deterministic model initialization, an
377  hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI.
378  @param sim the hwrf.wrf.WRFSimulation passed to the constructor."""
380  self.dstore,self.conf,self.confstr('fcsttask'),sim.copy(),
381  taskname=self.taskname+'.fcst',outdir=self.outdir+'/fcst',
382  workdir=self.workdir+'/fcst',keeprun=True)
383  self.__wrf=sim
384 
385  # WCOSS workaround:
386  self.fcst.sim.set_active_io_form_to(2)
387 
388  def make_fcst(self,detinit):
389  """!Adds input sources to the forecast object.
390 
391  Adds metgrid, geogrid, wrfinput, wrfbdy, wrfanl, and coupler
392  fort.65 input to the fcst member variable.
393  @param delinit the deterministic model initialization, an
394  hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI."""
395  """!Adds input sources to the forecast object.
396 
397  Adds metgrid, geogrid, wrfinput, wrfbdy, wrfanl, and coupler
398  fort.65 input to the fcst member variable.
399  @param delinit the deterministic model initialization, an
400  hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI."""
401  self.fcst\
402  .add_geogrid(self.geogrid) \
403  .add_metgrid(self.metgrid) \
404  .add_fort65(self.realinit) \
405  .add_wrfinput(self.realinit) \
406  .add_wrfbdy(self.realinit)
407  for domain in self.sim:
408  if domain.is_moad(): continue
409  self.fcst.add_wrfanl(self.wrfanl,domain)
410  self.fcst.sim.add_output('history',start=0,step=3600*6,end=6*3600)
411 
412  @property
413  def sim(self):
414  """!The wrf simulation made by make_wrf()"""
415  return self.__wrf
416 
417  def inputiter(self):
418  """!Passes control to the hwrf.prep.PrepHybrid.inputiter().
419 
420  Iterates over the prep member's inputiter, a
421  hwrf.prep.PrepHybrid.inputiter(). Yields all input data
422  information needed by the hwrf.input module to pull input
423  data."""
424  for d in self.prep.inputiter(): yield d
425 
426  def make_init(self,detinit):
427  """!Creates initialization tasks.
428 
429  Called from the constructor. Creates the initialization
430  tasks, prep, realinit and wrfanl; and links to the
431  deterministic init geogrid and metgrid tasks. This is called
432  by the constructor, to create the needed inputs to the fcst
433  member.
434  @param detinit the deterministic model initialization, an
435  hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI."""
436 
437  self.geogrid=detinit.geogrid
438  #self.realbdy=detinit.realinit
439  self.metgrid=detinit.metgrid
440  wrf=self.sim
441  moad=wrf.get_moad()
442  geodat=self.geogrid.geodat(moad)
443 
444  realsection=self.confstr('realinit')
445  prepsection=self.confstr('prep_hybrid')
446 
448  self.dstore,self.conf,prepsection,self.sim,geodat,
449  atime=self.anlintime,taskname=self.taskname+'.prep',
450  workdir=self.workdir+'/prep',outdir=self.outdir+'/prep')
451 
453  self.dstore,self.conf,realsection,self.sim,
454  taskname=self.taskname+'.real',keeprun=False,
455  outdir=self.outdir+'/real',workdir=self.workdir+'/real') \
456  .add_geogrid(self.geogrid)\
457  .add_metgrid(self.metgrid)\
458  .add_prep_hybrid(self.prep)
459 
460  # WCOSS workaround:
461  self.realinit.sim.set_active_io_form_to(2)
462 
463  self.prep.tvset('enkfmem',self.enkfmem)
464 
465  # We use a WRFGhost instead of a WRFAnl since WRFGhost
466  # disables the history stream.
468  self.dstore,self.conf,realsection,self.sim,False,
469  atime=self.anlintime,taskname=self.taskname+'.wrfanl',
470  workdir=self.workdir+'/wrfanl',outdir=self.outdir+'/wrfanl')\
471  .add_geogrid(self.geogrid) \
472  .add_metgrid(self.metgrid) \
473  .add_fort65(self.realinit) \
474  .add_wrfinput(self.realinit) \
475  .add_wrfbdy(self.realinit)
476 
477  def _make_track(self):
478  """Makes the gribber and tracker member variables."""
479 
480  ensdadoms=[ d for d in self.fcst.sim ]
481  d2=ensdadoms[-1]
482  postdomains=[ d2 ]
483  self.post=PostManyWRF(self.fcst,postdomains,self.conf,
484  self.confstr('post'),6*3600,needcrtm=False,
485  streams=['history'],taskname=self.taskname+'.post',
486  outdir=self.outdir+'/post',workdir=self.workdir+'/post')
487 
488  # Regridding stuff:
489  grid2=hwrf.regrib.igrb1(self.post,domain=d2)
491  'part',hwrf.tracker.tracker_subset,None)
492  domloc=hwrf.regrib.FixedLocation(lat=self.conf['config','domlat'],
493  lon=self.conf['config','domlon'])
494  stormloc=hwrf.regrib.FixedLocation(lat=self.storminfo.lat,
495  lon=self.storminfo.lon)
496  basin=self.storminfo.pubbasin2
497  if ((basin.upper()=='AL' or basin.upper()=='EP') \
498  and domloc.ewcenter<360.0):
499  domloc.ewcenter+=360.0
500  r=hwrf.regrib.RegribMany(copygb=alias(exe(self.conf.getexe('copygb'))),
501  wgrib=alias(exe(self.conf.getexe('wgrib'))))
502  r.sync_frequently=self.conf.getbool('config','sync_frequently',True)
503  r.add('d2',clatlon(grid2,res=[0.045,0.045],size=[30.,30.],
504  scan=136,n=[667,667]))
505  r.add('trkgrid',grid2*r.grid('d2'))
506  r.add('hwrftrk',hwrf.tracker.vinttave(r.GRIB('trkgrid')/trksub))
507  basedir=self.outdir+'/regribber'
508  enkf000='%03d'%int(self.enkfmem)
509  r.to_intercom('{out_prefix}.hwrftrk.grbf{fahr:02d}.mem'+enkf000,'hwrftrk')
511  self.dstore,self.conf,self.confstr('regribber'),r,
512  start=self.anlintime,step=6*3600,end=6*3600,
513  taskname=self.taskname+'.regribber',
514  atime=self.anlintime,workdir=self.workdir+'/regribber')
516  self.dstore,self.conf,self.confstr('tracker'),
517  taskname=self.taskname+'tracker',start=self.anlintime,
518  step=6*3600,end=6*3600,outdir=self.outdir+'/tracker',
519  workdir=self.workdir+'/tracker')
520  self.tracker.add_moving_grid(self.storminfo,self.gribber,'hwrftrk')
521  self.tracker.send_atcfunix(
522  'track0','{com}/{out_prefix}.trak.hwrf.atcfunix.mem'+enkf000)
523 
524  def products(self,**kwargs):
525  """!Iterates over all forecast products.
526 
527  Passes control to hwrf.fcsttask.AnalysisCycle.products() to
528  iterate over all products that match the specified arguments.
529  @param kwargs passed to hwrf.fcsttask.AnalysisCycle.products()"""
530  for p in self.fcst.products(**kwargs):
531  yield p
532 
533  def run(self):
534  """!Runs the initialization and forecast for this ensemble member.
535 
536  Runs the prep, realinit, wrfanl and fcst member tasks, using input
537  from the GFS ENKF, and the deterministic initialization."""
538  where=self.workdir
539  logger=self.log()
540  logger.info('Run ensemble member in %s'%(where,))
541  with produtil.cd.NamedDir(where,keep=False,keep_on_error=True,
542  logger=logger):
543  self.prep.run()
544  self.realinit.run()
545  self.wrfanl.run()
546  self.fcst.run()
547  if self.track:
548  with self.dstore.transaction() as t:
549  self.post.unrun()
550  self.gribber.unrun()
551  self.post.run()
552  self.gribber.run()
553  if self.gribber.is_completed():
554  self.tracker.run()
555  else:
556  msg='Error regridding inputs to tracker. See earlier log messages for details.'
557  logger.error(msg)
559  self.state=COMPLETED
560 
561  def _make_relocate_kwargs(self,track,modin,dest_dir,priorcycle):
562  """Makes a dict containing the keyword arguments to send in to
563  the constructor for the hwrf.relocate task(s).
564  modin - the modin argument to the constructor
565  dest_dir - the directory in which to run the relocate"""
566 
567  ensdadoms=[ d for d in self.fcst.sim ]
568  kwargs=dict(
569  sim=self.fcst.sim,domains=ensdadoms,
570  modin=modin,dest_dir=dest_dir,
571  workdir=self.workdir+'/relocate',
572  outdir=self.outdir+'/relocate')
573 
574  if priorcycle is not None: kwargs.update(ensda=priorcycle)
575  if track: kwargs.update(parentTrack=self.tracker,trackName='track0')
576  return kwargs
577 
578  def _make_relocate(self,track,modin,priorcycle):
579  """Makes the relocation, rstage1, rstage2 and rstage3 member
580  variables.
581  track - the track argument to the constructor
582  modin - the modin argument to the constructor"""
583 
584  dest_dir=os.path.join(self.workdir,'relocate')
585  kwargs=self._make_relocate_kwargs(track,modin,dest_dir,priorcycle)
586 
588  self.confstr('relocate'),
589  taskname_pattern=self.taskname+'relocate.stage'+'%d',
590  **kwargs)
591 
592  self.rstage1=self.relocation.rstage1
593  self.rstage3=self.relocation.rstage3
594 
595  def run_relocate(self):
596  """Runs the relocate jobs, if present."""
597  if 'rstage1' in self.__dict__:
598  self.rstage1.delete_temp()
599  self.rstage1.run()
600  if 'rstage3' in self.__dict__:
601  self.rstage3.run()
602  if self.rstage1.scrub and self.rstage3.scrub:
603  self.rstage3.delete_temp()
604 
605 ########################################################################
606 def write_ensda_flag_file(flag_file,run_ensda):
607  """!Writes the stormX.run_ensda flag file.
608 
609  Writs the storm*.run_ensda flag file for this cycle. The purpose
610  of the file is to tell the workflow and scripting layers whether
611  the ensemble needs to be run. The file will contain a single
612  line: RUN_ENSDA=YES or RUN_ENSDA=NO. Will also log a message to
613  the jlogfile at INFO level telling which was written.
614  @param flag_file the full path to the flag file
615  @param run_ensda True or False: should the ENSDA be run? """
616  if run_ensda is True:
617  with open(flag_file,'wt') as f:
618  f.write('RUN_ENSDA=YES\n')
619  produtil.log.jlogger.info('Will run HWRF ENSDA for this cycle.')
620  else: # None, False, anything else:
621  with open(flag_file,'wt') as f:
622  f.write('RUN_ENSDA=NO\n')
623  produtil.log.jlogger.info('Disabled HWRF ENSDA for this cycle.')
624 
625 def read_ensda_flag_file(flag_file):
626  """!Reads the stormX.run_ensda flag file
627 
628  This function is used by the scripting and workflow layers to
629  determine if the data assimilation ensemble should be run. Reads
630  the storm*.run_ensda flag file line by line, searching for a
631  single line RUN_ENSDA=YES or RUN_ENSDA=NO. Returns True for YES
632  or False for No, based on the last such line seen. Returns None
633  otherwise."""
634  run_ensda=None
635  with open(flag_file,'rt') as f:
636  for line in f:
637  if line.find('RUN_ENSDA=YES')>=0:
638  run_ensda=True
639  elif line.find('RUN_ENSDA=NO')>=0:
640  run_ensda=False
641  return run_ensda
642 
643 ########################################################################
645  """!Determines if Tail Doppler Radar (TDR) data is available.
646 
647  This class checks to see if a specified cycle has Tail Doppler
648  Radar data available. This is the condition used to decide
649  whether to run the DA ensemble in the 2015 Operational HWRF."""
650  def __init__(self,dstore,conf,section,cycle_rel,**kwargs):
651  """!CycleTDRCheck constructor.
652 
653  Create a new CycleTDRCheck which will look for TDR for the
654  specified cycle. The cycle_rel is anything accepted by
655  to_datetime_rel's second argument.
656  @param dstore the produtil.datastore.Datastore database object
657  @param conf the hwrf.config.HWRFConfig with configuration info
658  @param section the section to use within conf
659  @param cycle_rel specifies the cycle. This must be a number
660  of hours relative to the current cycle (conf.cycle) analysis
661  time. For example, -6*3600 would be the prior cycle and
662  48*3600 would be two days from now.
663  @param kwargs passed to the superclass constructor"""
664  super(CycleTDRCheck,self).__init__(dstore,conf,section,**kwargs)
665  incat_name=self.confstr('catalog')
666  self.__ensda_flag_file=self.confstr('tdr_flag_file')
667  self.__run_ensda=None
668  self.tgtcycle=to_datetime_rel(cycle_rel,conf.cycle)
670  self.conf,incat_name,self.tgtcycle)
671  dataset=self.confstr('dataset','tdr')
672  item=self.confstr('item','gdas1_bufr')
673  obstype=self.confstr('obstype','tldplr')
674  self.__tdrdict=dict(self.taskvars,dataset=dataset,item=item,
675  obstype=obstype,atime=self.tgtcycle,ftime=self.tgtcycle,
676  optional=True)
677  self._dirname=self.workdir
678  self._stormid='999'
679 
680  ##@var tgtcycle
681  # the cycle for whom TDR data is checked
682 
683  def should_run_ensda(self):
684  """!Should ENSDA be run?
685 
686  If self.run was called in this process, returns the cached
687  result of that. Otherwise, reads the run_ensda flag file from
688  COM. Uses hwrf.ensda.read_ensda_flag_file()"""
689  if self.__run_ensda is None:
691  return self.__run_ensda
692 
693  def inputiter(self):
694  """!Iterates over needed files from upstream workflows.
695 
696  This iterator's purpose is to provide "all files external
697  to this workflow" that are needed by the task. In this case,
698  of course, it is only the TDR bufr_d file. Hence, there is a
699  single "yield" statement that requests the TDR."""
700  yield self.__tdrdict
701 
702  def run(self):
703  """!creates the storm*.run_ensda file
704 
705  Creates the storm1.run_ensda flag file with RUN_ENSDA=YES if
706  the TDR data is available, and RUN_ENSDA=NO otherwise."""
707 
708  run_ensda=False
709 
710  try:
711  if self._actually_run():
712  run_ensda=True
713  finally:
714  self.write_flag_file(run_ensda)
715 
716  def write_flag_file(self,run_ensda):
717  """!Write the ensda flag file.
718 
719  Calls hwrf.ensda.write_ensda_flag_file to write the flag file.
720  @param run_ensda True means the ensemble should be run, False
721  if it should not be run."""
723 
724  def tdr_this_cycle(self):
725  """!Check if TDR data is available for this cycle
726 
727  Checks the on-disk input data to see if the TDR data is
728  available."""
729  logger=self.log()
730  atime=to_datetime(self.conf.cycle)
732  self.conf,self.confstr('catalog'),atime)
733  item=self.conf.get('tdr_new_obstype','item')
734  ds=os.path.join(self.getdir('intercom'),'bufrprep')
735  it=self._in_catalog.parse(item,atime=atime,
736  logger=logger,obstype='tldplr')
737  there=os.path.join(ds,it)
738  logger.info('TDR bufrprep should be at %s'%there)
739  if isnonempty(there):
740  return True
741  else:
742  return False
743 
744  def _actually_run(self):
745  """!Search on disk for TDR or a trigger file.
746 
747  Do not call this routine directly; it is an internal
748  implementation routine. This routine contains the code that
749  actually determines if TDR is present or not. The "run"
750  routine calls _actually_run in a try-finally block to ensure
751  the run_ensda flag file is created no matter what."""
752  logger=self.log()
753 
754  if self.realtime and os.path.isdir('/dcom/us007003'):
755  if self.tdr_this_cycle():
756  logger.info('TDR data is available for current cycle %s!'
757  'Enabling ENSDA.'%self.conf.cycle.strftime('%Y%m%d%H'))
758  return True
759  elif self.read_trigger_file():
760  logger.info('There will be TDR data for cycle %s!'
761  'Enabling ENSDA.'%self.tgtcycle.strftime('%Y%m%d%H'))
762  return True
763  else:
764  logger.warning('ensda trigger file is not found. '
765  'TDR data is not available for current cycle'
766  'Will continue without ENSDA.')
767  return False
768  else:
769  ic=self.__in_catalog
770  there_it_is=ic.locate(**self.__tdrdict)
771  if there_it_is is None:
772  logger.error(
773  'Configuration error: DataCatalog %s does not know how '
774  'to find TDR data. Will continue without ENSDA.'
775  %(repr(ic),))
776  return False
777  elif not isnonempty(there_it_is):
778  logger.warning(
779  '%s: %s Tail Doppler Radar bufr_d file is empty or '
780  'non-existant. Will continue without ENSDA.'
781  %(there_it_is,self.tgtcycle.strftime('%Y%m%d%H')))
782  return False
783  else:
784  logger.info('%s: TDR data found for cycle %s! Enabling ENSDA.'
785  %(there_it_is,self.tgtcycle.strftime('%Y%m%d%H')))
786  return True
787 
788  def read_trigger_file(self):
789  """!Read TDR trigger file for operational run
790 
791  Reads the TDR trigger file sent by the Aircraft Operations
792  Center (AOC) before a NOAA P3 Orion flight. This is used in
793  NCEP Operations when running the operational HWRF, to
794  determine if TDR is going to be available soon."""
795  logger=self.log()
796  atime=to_datetime(self.tgtcycle)
797  ymdh=atime.strftime('%Y%m%d%H')
798  basin=self.storminfo.pubbasin2
799  if int(ymdh[0:4]) <= 2013:
800  self._stormid=self.storminfo.stnum
801  elif basin.upper()=='AL':
802  self._stormid='%s%02d' % ('1',self.storminfo.stnum)
803  elif basin.upper()=='EP':
804  self._stormid='%s%02d' % ('2',self.storminfo.stnum)
805  elif basin.upper()=='CP':
806  self._stormid='%s%02d' % ('3',self.storminfo.stnum)
807  else:
808  self._stormid='999'
809 
810  input_catalog=self.conf.get('config','fcst_catalog')
811  dcom=self.conf.get(input_catalog,'dcom','/dcom/us007003')
812  if os.path.isdir(dcom):
813  btime=to_datetime_rel(-24*3600,atime)
814  tank1=os.path.join(dcom,atime.strftime("%Y%m%d"),'b006/xx070')
815  tank2=os.path.join(dcom,btime.strftime("%Y%m%d"),'b006/xx070')
816  logger.info('Locations: tank1 at %s tank2 at %s'%(tank1,tank2))
817  with NamedDir(self._dirname,keep=not self.scrub,logger=logger):
818  numtry=self.confint('numofcheck',1)
819  timeinv=self.confint('checksecinv',300)
820  stime=timeinv/2
821  n=1
822  while n<=numtry:
823  if isnonempty(tank1):
824  logger.info('tank1 exist')
825  make_symlink(tank1,'tldplrbufr',force=True,logger=logger)
826  self.readensdatrigger(self._stormid,ymdh)
827  if not isnonempty('runensda') and ymdh[8:10]=='00' \
828  and isnonempty(tank2):
829  make_symlink(tank2,'tldplrbufr',force=True,logger=logger)
830  self.readensdatrigger(self._stormid,ymdh)
831  if n<numtry:
832  if wait_for_files(
833  'runensda',logger,
834  maxwait=timeinv,sleeptime=stime,min_size=1,
835  min_mtime_age=5,min_atime_age=None,
836  min_ctime_age=None,min_fraction=1.0):
837  logger.info('found trigger file')
838  n=numtry+1
839  n+=1
840  if isnonempty('runensda'):
841  return True
842  else:
843  totalwait=timeinv*(numtry-1)/60.0
844  logger.info('waited for %s minutes, ensda trigger'
845  ' is not found'%str(totalwait))
846  return False
847  else:
848  logger.warning('%s does not exist. This is not wcoss.'
849  'real-time ensda trigger can only be run on wcoss'%dcom)
850  return False
851 
852  def readensdatrigger(self,stmidin,tgtcycle):
853  """!Runs the hwrf_readtdrtrigger program.
854 
855  Runs the hwrf_readtdrtrigger program to decide if the TDR
856  trigger file is for this storm or not.
857  @param stmidin the storm
858  @param tgtcycle the cycle of interest"""
859  self.log().info('readensdatrigger')
860  logger=self.log()
861  fprog = 'hwrf_readtdrtrigger'
862  prog = self.getexe(fprog)
863  cmd = produtil.run.exe(prog) << stmidin+" "+tgtcycle
864  produtil.run.checkrun(cmd,logger=logger)
865 
866 ########################################################################
868  """!Used in place of CycleTDRCheck to force ENSDA to always run.
869 
870  This subclass of CycleTDRCheck instructs ENSDA to run whether
871  TDR is available or not."""
872  def should_run_ensda(self):
873  """!Returns True.
874 
875  Always returns True, indicating that ENSDA should always be
876  run even if the world is imploding and pigs are raining from
877  clouds of cotton candy.
878  @returns True"""
879  return True
880  def _actually_run(self):
881  """!Runs the TDR check and ignores its results.
882 
883  Calls the superclass _actually_run, so that the TDR check is
884  done, and then returns True regardless of the TDR
885  availability. Logs a warning about this to the
886  produtil.log.jlogger.
887  @returns True"""
888  if not super(AlwaysRunENSDA,self)._actually_run():
889  msg="OVERRIDE: Will run ENSDA anyway due to "\
890  "configuration settings."
891  self.log().warning(msg)
892  jlogger.warning(msg)
893  return True
894 
895 ########################################################################
896 def enada_pre_object_for(ds,conf,section,next_cycle):
897  """!Generates a CycleTDRCheck or AlwaysRunENSDA based on
898  configuration settings.
899 
900  Reads the [config] section ensda_when option to decide what TDR
901  check class should be used.
902  * ensda_when="tdr_next_cycle" - create a CycleTDRCheck
903  * ensda_when="always" - create an AlwaysRunENSDA
904  * ensda_when=anything else - raise an exception
905  @param ds,conf,section,next_cycle - passed to the constructor"""
906  ensda_when=conf.getstr('config','ensda_when','tdr_next_cycle').lower()
907  if ensda_when=='tdr_next_cycle':
909  ds,conf,'tdrcheck',next_cycle)
910  elif ensda_when=='always':
912  ds,conf,'tdrcheck',next_cycle)
913  else:
914  raise ValueError('The ensda_when option must be set to tdr_next_cycle or always (case-insensitive) not %s.'%(repr(ensda_when),))
915 
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
Create namelists, monitor wrf simulations, generate filenames.
Definition: wrf.py:1
def get_wrfinput(self, domain, atime)
return the product for the specified domain and analysis time
Definition: ensda.py:222
def should_run_ensda(self)
Should ENSDA be run?
Definition: ensda.py:683
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
Definition: hwrftask.py:319
def tdr_this_cycle(self)
Check if TDR data is available for this cycle.
Definition: ensda.py:724
Runs regrib operations on many input times, sending output to an hwrf.gribtask.GRIBTask.
Definition: regrib.py:311
def dump(self)
print detailed diagnostics
Definition: ensda.py:163
runs wrf for an analysis cycle
Definition: fcsttask.py:1675
A wrapper around PostOneWRF that posts many WRF output times.
Definition: post.py:514
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
Represents a two-dimensional ensemble-vs-time array of hwrf.hwrftask.HWRFTask objects.
Definition: ensda.py:33
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
def inputiter(self)
Passes control to the hwrf.prep.PrepHybrid.inputiter().
Definition: ensda.py:417
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
This task runs the GFDL Vortex Tracker on HWRF output.
Definition: tracker.py:649
conf
This HWRFTask's hwrf.config.HWRFConfig object.
Definition: hwrftask.py:415
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
def nmembers(self)
The number of members of the ensemble.
Definition: ensda.py:75
a HWRFTask subclass that runs real_nmm
Definition: fcsttask.py:1001
storminfo
The hwrf.storminfo.StormInfo describing the vitals information for the storm processed by this HWRFTa...
Definition: hwrftask.py:94
prep
the hwrf.prep.PrepHybrid that processes the GFS ENKF forecast and analysis spectral data ...
Definition: ensda.py:447
def run(self)
creates the storm*.run_ensda file
Definition: ensda.py:702
metgrid
the metgrid task from the deterministic initialization
Definition: ensda.py:439
def __init__(self, dstore, conf, section, domains, enkfmem, anlouttime, kwargs)
FromPriorCycle constructor.
Definition: ensda.py:186
def inputiter(self)
iterate over all needed input data, for hwrf.input
Definition: ensda.py:152
def make_fcst(self, detinit)
Adds input sources to the forecast object.
Definition: ensda.py:388
def make_init(self, detinit)
Creates initialization tasks.
Definition: ensda.py:426
def set_member(self, atime, enkfmem, task)
sets the HWRFTask to use to use for one cycle of one member
Definition: ensda.py:106
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
Definition: run.py:398
def should_run_ensda(self)
Returns True.
Definition: ensda.py:872
tgtcycle
the cycle for whom TDR data is checked
Definition: ensda.py:668
def write_flag_file(self, run_ensda)
Write the ensda flag file.
Definition: ensda.py:716
def __init__(self, dstore, conf, section, detinit, enkfmem, sim, taskname=None, track=None, relocate=None, priorcycle=None, kwargs)
Constructor for FromGFSENKF.
Definition: ensda.py:296
def _make_track(self)
Definition: ensda.py:477
geogrid
the geogrid task from the deterministic initialization
Definition: ensda.py:437
A GRIBBase that subsets GRIB files, keeping only certain parameters.
Definition: regrib.py:1366
enkfmem
the enkf member id
Definition: ensda.py:330
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
Definition: hwrftask.py:396
def run(self)
Runs the initialization and forecast for this ensemble member.
Definition: ensda.py:533
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
PrepHybrid runs the prep_hybrid program to transform GFS spectral files to the HWRF grid...
Definition: prep.py:1
def anlintime(self)
The analysis input time.
Definition: ensda.py:366
Runs the real_nmm or wrf executables.
Definition: fcsttask.py:1
def __init__(self, dstore, conf, section, cycle_rel, kwargs)
CycleTDRCheck constructor.
Definition: ensda.py:650
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
def to_datetime(d)
Converts the argument to a datetime.
Definition: numerics.py:346
def write_ensda_flag_file(flag_file, run_ensda)
Writes the stormX.run_ensda flag file.
Definition: ensda.py:606
def sim(self)
The wrf simulation made by make_wrf()
Definition: ensda.py:413
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
Obtains input data needed by various subclasses of hwrf.hwrftask.HWRFTask.
Definition: input.py:1
Time manipulation and other numerical routines.
Definition: numerics.py:1
realinit
the hwrf.fcsttask.RealNMM that generates wrfinput_d01 and wrfbdy_d01 files for input to the fcst and ...
Definition: ensda.py:452
Forecast ensemble member based on the GFS ENKF.
Definition: ensda.py:286
An hwrf.hwrftask.HWRFTask that performs regribbing operations.
Definition: gribtask.py:46
def _make_relocate(self, track, modin, priorcycle)
Definition: ensda.py:578
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def confint
Alias for self.conf.getint for section self.section.
Definition: hwrftask.py:248
def steps_for_member(self, enkfmem)
iterate over analysis cycles for a specific member
Definition: ensda.py:135
def products(self, kwargs)
Iterates over all forecast products.
Definition: ensda.py:524
Represents a specific location on the earth as a latitude, longitude pair.
Definition: regrib.py:1300
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
Definition: hwrftask.py:195
def log
Obtain a logging domain.
Definition: hwrftask.py:425
def member_ids(self)
Iterates over all member ids.
Definition: ensda.py:101
Represents an ensemble member from the previous forecast cycle.
Definition: ensda.py:180
def anlouttime(self)
The time at the end of the last ensemble step.
Definition: ensda.py:70
def anlintime(self)
The time at the beginning of the first ensemble step.
Definition: ensda.py:65
def anlouttimes(self)
Iterates over all ensemble analysis output times.
Definition: ensda.py:91
def inputiter(self)
Iterates over needed files from upstream workflows.
Definition: ensda.py:693
def members_at_time(self, atime)
iterate over members for a specified analysis input time
Definition: ensda.py:117
A time-indexed array that can only handle equally spaced times.
Definition: numerics.py:689
def enada_pre_object_for(ds, conf, section, next_cycle)
Generates a CycleTDRCheck or AlwaysRunENSDA based on configuration settings.
Definition: ensda.py:896
def members_at_anlouttime(self)
iterate over members at the final analysis output time.
Definition: ensda.py:127
Configures logging.
Definition: log.py:1
def nsteps(self)
The number of ensemble DA time steps.
Definition: ensda.py:80
def member(self, atime, enkfmem)
get the analysis cycle for the specified member and time
Definition: ensda.py:143
def _actually_run(self)
Search on disk for TDR or a trigger file.
Definition: ensda.py:744
def read_trigger_file(self)
Read TDR trigger file for operational run.
Definition: ensda.py:788
def get_product(self, domain, atime)
Returns a product for the specified domain and time.
Definition: ensda.py:234
wrfanl
the hwrf.fcsttask.WRFAnl that generates input wrfanl_d* files for input to the fcst ...
Definition: ensda.py:467
vinttave
An alias for GRIB1VintTave.
Definition: tracker.py:388
Runs the Unified Post Processor on outputs from the WRF-NMM, producing E grid GRIB files as EGRIB1Pro...
Definition: post.py:1
This module contains tasks to prepare input for the GFDL Vortex Tracker, run the tracker and deliver ...
Definition: tracker.py:1
Provides the location of a file in an archive, on disk or on a remote server via sftp or ftp...
Definition: input.py:109
def _make_relocate_kwargs(self, track, modin, dest_dir, priorcycle)
Definition: ensda.py:561
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
Definition: run.py:242
generate and manipulate wrf namelists, predict output filenames
Definition: wrf.py:663
def products(self, domains=None, kwargs)
Iterates over all products.
Definition: ensda.py:204
def iteritems(self)
Iterates over all metadata (key,value) pairs for this Datum, including "available" and "location"...
Definition: datastore.py:677
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
Describes regribbing operations using an algebraic structure.
Definition: regrib.py:1
def make_wrf(self, detinit, sim)
Create the wrf() and fcst.
Definition: ensda.py:370
def igrb1(task, kwargs)
This is a convenient alias for the GRIB1Selector constructor.
Definition: regrib.py:1956
runs a short WRF simulation to generate wrfanl files named "ghost"
Definition: fcsttask.py:1300
Exceptions for hwrf.regrib.GRIBTask for certain internal errors.
Definition: exceptions.py:292
def read_ensda_flag_file(flag_file)
Reads the stormX.run_ensda flag file.
Definition: ensda.py:625
def anlintimes(self)
Iterates over all ensemble analysis input times.
Definition: ensda.py:85
Runs the prep_hybrid program on GFS spectral files.
Definition: prep.py:23
def run_relocate(self)
Definition: ensda.py:595
def taskvars(self)
The dict of object-local values used for string substitution.
Definition: hwrftask.py:243
def realtime(self)
Is this job a real-time forecast job?
Definition: hwrftask.py:180
def get_wrfanl(self, domain, atime)
returns the wrfanl file's product
Definition: ensda.py:231
Determines if Tail Doppler Radar (TDR) data is available.
Definition: ensda.py:644
def __init__(self, dstore, conf, section, anlintime=None, taskname=None, kwargs)
Constructor for DAEnsemble.
Definition: ensda.py:37
fcst
the forecast task, an hwrf.fcsttask.AnalysisCycle
Definition: ensda.py:379
def readensdatrigger(self, stmidin, tgtcycle)
Runs the hwrf_readtdrtrigger program.
Definition: ensda.py:852
Represents a Product created by an external workflow.
Definition: datastore.py:915
Used in place of CycleTDRCheck to force ENSDA to always run.
Definition: ensda.py:867
This represents all three stages of the relocate.
Definition: relocate.py:2189