HWRF  trunk@4391
post.py
1 """!Runs the Unified Post Processor on outputs from the WRF-NMM,
2 producing E grid GRIB files as EGRIB1Product objects. """
3 
4 ##@var __all__
5 # List of symbols exported by "from hwrf.post import *"
6 __all__=['PostOneWRF','PostManyWRF','EGRIB1Product','check_post',
7  'link_post_fix']
8 
9 import glob,os,os.path,stat,time,math, re
10 
11 
13 import produtil.locking
15 
16 from hwrf.exceptions import PostFailed
17 from hwrf.regrib import GRIB1File,UpstreamGRIB1
18 
19 from produtil.datastore import FileProduct,COMPLETED,FAILED,RUNNING,UNSTARTED
20 from produtil.cd import TempDir
21 from produtil.fileop import *
22 from produtil.run import *
23 
24 from hwrf.hwrftask import HWRFTask
25 from hwrf.numerics import *
26 
27 ########################################################################
28 
29 def check_post(retval,what,logger):
30  """!Did the post run successfully in the current working directory?
31 
32  Checks the current working directory and the specified return
33  value retval from the post to determine if the post succeeded.
34  Returns a four-element tuple (ok,cenla,cenlo,filename) where "ok"
35  is True if the post succeded, (cenla,cenlo) is the domain center
36  and filename is the name of the post output file.
37  @param retval the post return value
38  @param what for log messages: what was posted?
39  @param logger the logging.Logger for log messages"""
40  if retval!=0:
41  # The post return value is not reliable, so verify the
42  # simulation failed by checking for the post's completion
43  # message.
44  logger.warning('%s:Non-zero exit status %d from post.'%(what,retval))
45  # Check the last 100kb or so for "ALL GRIDS PROCESSED"
46  with open('vpost.log','rb') as f:
47  f.seek(0,os.SEEK_END)
48  filesize=f.tell()
49  readsize=min(102400,filesize)
50  f.seek(-readsize,os.SEEK_END)
51  vpostdat=f.read(readsize)
52  if(-1 == vpostdat.find('ALL GRIDS PROCESSED')):
53  logger.warning(
54  '%s: Did not find "ALL GRIDS PROCESSED" in last %d '
55  'bytes of vpost.log (file size %d). Post probably '
56  'failed'%(what,readsize,filesize))
57  lins=vpostdat.splitlines()
58  for lin in lins[-50:]:
59  logger.warning('vpost.log: %s'%(lin,))
60  return (False,None,None,None)
61  if os.path.exists('vpost.log'):
62  with open('vpost.log','rt') as f:
63  pass
64  # At this point, it seems likely that the post THINKS it succeded.
65  # Get the cenla/cenlo from the first 100kB of
66  # this-domain-center.ksh.inc
67  cenla=None
68  cenlo=None
69  sentexc=False
70  if not produtil.fileop.isnonempty('this-domain-center.ksh.inc'):
71  logger.error(
72  '%s: this-domain-center.ksh.inc is empty or does not exist. WRF failed.'%(what,))
73  run(exe('ls')['-ltr'])
74  run(exe('tail')['-20','vpost.log'])
75  return(False,None,None,None)
76  with open('this-domain-center.ksh.inc','rt') as f:
77  for line in f.readlines(102400):
78  try:
79  m=re.search('(clat|clon)\s*=\s*([+-]?[.0-9]+)',line)
80  if m:
81  (varr,where)=m.groups()
82  if varr=='clat': cenla=float(where)
83  if varr=='clon': cenlo=float(where)
84  except(KeyError,ValueError,TypeError,AttributeError) as e:
85  if not sentexc:
86  sentexc=True
87  logger.warning(
88  '%s: Exception while parsing this-domain-'
89  'center.ksh.inc'%(what,),exc_info=True)
90  if cenla is None:
91  logger.warning('%s: Could not get clat from this-domain-'
92  'center.ksh.in'%(what,))
93  if cenlo is None:
94  logger.warning('%s: Could not get clon from this-domain-'
95  'center.ksh.in'%(what,))
96  ok=cenla is not None and cenlo is not None
97  # Now we need to find the post output file.
98  for filename in glob.glob('WRFPRS*'):
99  logger.info('%s: success: cenla=%s cenlo=%s file=%s.'
100  %(what,cenla,cenlo,filename))
101  return (ok,cenla,cenlo,filename)
102  logger.warning('%s: No WRFPRS* files found.'%(what,))
103  run(exe('ls')['-ltr'])
104  run(exe('tail')['-20','vpost.log'])
105  return(False,None,None,None)
106 
107 ########################################################################
108 
109 def link_post_fix(fixd,needcrtm,logger=None,copy=False):
110  """!Links or copies all fix files for the post to the current
111  working directory.
112  @param fixd the fix directory
113  @param needcrtm flag: is the CRTM data needed?
114  @param logger the logging.Logger for log messages"""
115 
116  if logger is not None:
117  logger.info('%s: link post fix files here. Needcrtm=%s'%(
118  str(os.getcwd()),repr(needcrtm)))
119 
120  # Copy the microphysics lookup table to many different names:
121  for tgt in [ 'eta_micro_lookup.dat', 'nam_micro_lookup.dat',
122  'hires_micro_lookup.dat' ]:
123  # Note about micro lookups: the code opens these for
124  # read/write, not read, so we cannot link. We have to create
125  # a local copy. The preserve_perms=False prevents us from
126  # turning off write access if the fix file copy is read-only.
127  src=os.path.join(fixd,'hwrf-wrf','ETAMPNEW_DATA.expanded_rain')
128  produtil.fileop.deliver_file(src,tgt,logger=logger,keep=True,
129  preserve_perms=False)
130 
131  # CRTM coefficient files if needed:
132  if needcrtm:
133  crtmd=os.path.join(fixd,'hwrf-crtm-2.0.6')
134  links=[]
135  for src in ( "amsre_aqua", "imgr_g11", "imgr_g12", "imgr_g13",
136  "imgr_g15", "imgr_mt1r", "imgr_mt2", "seviri_m10",
137  "ssmi_f13", "ssmi_f14", "ssmi_f15", "ssmis_f16",
138  "ssmis_f17", "ssmis_f18", "ssmis_f19", "ssmis_f20",
139  "tmi_trmm", "v.seviri_m10", "imgr_insat3d" ):
140  links.append('%s/SpcCoeff/Big_Endian/%s.SpcCoeff.bin'%(crtmd,src))
141  links.append('%s/TauCoeff/Big_Endian/%s.TauCoeff.bin'%(crtmd,src))
142  for src in ( 'Aerosol','Emis','Cloud' ):
143  links.append('%s/%sCoeff/Big_Endian/%sCoeff.bin'%(crtmd,src,src))
144  make_symlinks_in(links,'.',logger=logger,force=True,
145  copy=copy)
146 
147 ########################################################################
148 
149 ##@var wrf_hr_min
150 # References a function that converts forecast and analysis times to
151 # the number of hours and minutes between them.
152 wrf_hr_min = fcst_hr_min
153 
155  """!Represents an E grid WRF-NMM GRIB1 file, and stores two
156  metadata values: CENLA and CENLO which contain the domain center
157  location."""
158  def deliver(self,location,fileinfo,logger=None):
159  """!Copies the file to its destination, and sets the CENLA and
160  CENLO metadata values to the domain center.
161  @param location the destination location
162  @param fileinfo a dict containing fromloc, the location;
163  CENLA, the domain center latitude; and CENLO, the domain
164  center longitude.
165  @param logger a logging.Logger for log messages"""
166  assert(location is not None)
167  dirname=os.path.dirname(location)
168  if not os.path.exists(dirname):
169  os.makedirs(dirname)
170  deliver_file(fileinfo['fromloc'],location,keep=False,logger=logger)
171  cenla=fileinfo['CENLA']
172  cenlo=fileinfo['CENLO']
173  assert(cenla is not None)
174  assert(cenlo is not None)
175  with self.dstore.transaction() as t:
176  self['CENLA']=cenla
177  self['CENLO']=cenlo
178  self.set_loc_avail(location,True)
179  self.update()
180  assert('CENLA' in self)
181  assert('CENLO' in self)
182  self.call_callbacks(logger=logger)
183  def make_location(self,outdir):
184  """!Decides a filename which is of the format
185  "outdir/category/prodname".
186  @param outdir the output directory"""
187  return os.path.join(os.path.join(outdir,self.category),self.prodname)
188  def make(self,regrib,*args,**kwargs):
189  """!Delivers the file.
190  @returns a hwrf.regrib.GRIB1File for the file.
191  @param regrib the hwrf.regrib.Regrib with input information
192  @param args,kwargs ignored"""
193  loc=self.location
194  (filename,index)=regrib.gribtemp('prod.'+os.path.basename(loc))
195  produtil.fileop.deliver_file(loc,filename,logger=regrib.logger)
196  return GRIB1File(filename,None,None,self['CENLA'],self['CENLO'])
197  def getnscenter(self):
198  """!The domain center latitude."""
199  return self['CENLA']
200  def getewcenter(self):
201  """!The domain center longitude."""
202  return self['CENLO']
203  ##The domain center latitude or None if unknown
204  nscenter=property(getnscenter,None,None,
205  'Returns None or the center latitude of this GRIB1 file.')
206  ##The domain center longitude or None if unknown
207  ewcenter=property(getewcenter,None,None,
208  'Returns None or the center longitude of this GRIB1 file.')
209 
210 ########################################################################
211 
213  """!This is an HWRFTask that post-processes output data for a
214  single WRF stream, from several WRF domains at a a given time."""
215  def __init__(self,wrf,domains,conf,section,time,stream='history',
216  needcrtm=True,grib=1,faketime=None,taskname=None,**kwargs):
217  """!PostOneWRF constructor
218  @param wrf the hwrf.fcsttask.WRFTaskBase or subclass, whose
219  data is being posted
220  @param domains list of domains to post, as hwrf.wrf.WRFDomain objects
221  @param conf the hwrf.config.HWRFConfig that provides configuration ifnormation
222  @param section the config section in conf
223  @param time the forecast time being posted
224  @param stream the WRF stream to post
225  @param needcrtm do we need CRTM fix files?
226  @param grib GRIB version: 1 or 2
227  @param faketime unused
228  @param taskname the task name in the database
229  @param kwargs additional keyword arguments passed to hwrf.hwrftask.HWRFTask.__init__"""
230  super(PostOneWRF,self).__init__(wrf.dstore,wrf.conf,section,
231  taskname=taskname,**kwargs)
232  self.__needcrtm=needcrtm
233  self.__wrf=wrf
234  self.__time=to_datetime_rel(time,wrf.wrf().simstart())
235  self.__domains=domains
236  self.__stream=stream
237  self.__wrfproducts={}
238  self.__myproducts={}
239  self.__grib2 = (grib==2)
240  assert(domains)
241  prodext = ( 'egrb2' if(grib==2) else 'egrb' )
242  if grib==2:
243  raise NotImplementedError(
244  'GRIB2 support not yet implemented in hwrf.post')
245  added=False
246  for domain in self.__domains:
247  first=True
248  assert(stream is not None)
249  assert(time is not None)
250  for product in wrf.products(domains=[domain],stream=stream,
251  time=self.__time):
252  added=True
253  assert(first)
254  first=False
255  self.__wrfproducts[domain]=product
256  self.log().debug('%s added %s => %s'%(self.taskname,
257  repr(domain),repr(product)))
258  prodname=self.product_name(domain)
259  self.__myproducts[domain]=\
260  EGRIB1Product(self.dstore,category=self.taskname,prodname=prodname)
261  assert(added)
262  def product_name(self,domain):
263  """!Returns a human readable string representation of the
264  product name for the given domain. This is used for filenames
265  and product ids.
266  @param domain the domain of interes"""
267  ext = ('egrb2' if(self.__grib2) else 'egrb')
268  if domain is None:
269  result='%s.%s'%(self.taskname,ext) # nonsatpost.egrb1
270  else:
271  result='%s-%s.%s'%(self.taskname,domain.name,ext) # nonsatpost-moad.egrb1
272  result=re.sub('[^a-zA-Z0-9_.-]','_',result)
273  return result
274  def wrf(self):
275  """!Returns the WRFSimulation object."""
276  return self.__wrf.wrf()
277  def wrftask(self):
278  """!Returns the Task that represents the running WRF simulation."""
279  return self.__wrf
280  def products(self,*args,**kwargs):
281  """!Iterates over products.
282 
283  If the domain keyword is in kwargs, then only the specified
284  domain is iterated. If the domains keyword is in kwargs, then
285  that list of domains is iterated.
286  @param args,kwargs argument list"""
287  if 'domains' in kwargs:
288  for domain in kwargs['domains']:
289  if domain in self.__myproducts:
290  yield self.__myproducts[domain]
291  elif 'domain' in kwargs and kwargs['domain'] in self.__myproducts:
292  yield self.__myproducts[kwargs['domain']]
293  else:
294  for domain in self.__myproducts:
295  yield self.__myproducts[domain]
296  def make_control(self,stream):
297  """!Makes the post control file for the specified stream as
298  the file fort.14 in the local directory
299 
300  Figure out what control file to use from the conf section for
301  this task. We look for a stream-specific one first (ie.:
302  "auxhist2_control"). If that is missing we use a default
303  control file from the "control" variable. The control file is
304  then copied to fort.14.
305  @bug this function should use deliver_file with logging"""
306  Missing='**MISSING**'
307  logger=self.log()
308  control=self.confget('%s_control'%(stream,),Missing)
309  if control is Missing: control=self.confstr('control')
310  self.log().debug('Use control file %s'%(control,))
311  if os.path.exists('./fort.14'):
312  produtil.fileop.remove_file('fort.14',logger=logger)
313  produtil.fileop.deliver_file(control,'fort.14',keep=True,logger=logger)
314 
315  def link_fix(self):
316  """!Calls link_post_fix() to link fix files."""
317  fixd=self.getdir('FIXhwrf')
318  needcrtm=self.__needcrtm
319  link_files=self.confbool('link_wrf_fix',True)
320  link_post_fix(fixd,needcrtm,logger=self.log(),copy=not link_files)
321 
322  def requested_time(self):
323  """!Returns the forecast time that is being processed."""
324  return self.__time
325  def domains(self):
326  """!Iterates over all domains that will be processed."""
327  for domain in self.__domains: yield domain
328  def domprod(self):
329  """!Iterates over wrf domains and products
330 
331  Iterates over tuples (domain,wrfproduct,myproduct) where
332  domain is the WRF domain, wrfproduct is the WRF output Product
333  from that domain for the chosen time and myproduct is the
334  output EGRIB1Product for that time."""
335  for domain in self.__domains:
336  yield domain,self.__wrfproducts[domain],self.__myproducts[domain]
337  def del_post_output(self):
338  """!Deletes any post output files from the current working
339  directory."""
340  for filename in glob.glob('./WRFPRS*'):
341  os.unlink(filename)
342  if os.path.exists('this-domain-center.ksh.inc'):
343  os.unlink('this-domain-center.ksh.inc')
344  def check_post(self,retval,what):
345  """!Calls check_post() to see if the post succeeded
346  @param retval the post return value
347  @param what String description of what the post was run on (for log messages)
348  @returns True if the post succeeded, False otherwise"""
349  return check_post(retval,what,self.log())
350  def can_run(self):
351  """!Returns True if the needed inputs are available for the
352  post, and False if they are not."""
353  for domain,wrfprod,myprod in self.domprod():
354  if myprod.available: continue
355  if not wrfprod.available:
356  wrfprod.check()
357  if not wrfprod.available:
358  self.log().debug(
359  '%s: cannot run: %s not available (loc=%s avail=%s)'%(
360  self.taskname, wrfprod.did,
361  repr(wrfprod.location), repr(wrfprod.available)))
362  return False
363  return True
364  def run(self, nosleep=False, raiseall=False):
365  """!Runs the post for one forecast time and all WRF domains.
366  @param nosleep If True, disable sleep calls.
367  @param raiseall If True, do not catch exceptions."""
368  state=self.getstate()
369  logger=self.log()
370  sync_frequently=self.confbool('sync_frequently',True)
371  lastsync=0
372  if state is COMPLETED or state is FAILED:
373  return
374  with TempDir(prefix='post.%s.'%(self.product_name(None),),dir='.',
375  suffix='.work',logger=self.log()) as tempdir:
376  self.log().info('cwd: '+os.getcwd())
377  assert(not re.match('\A/tmp',os.getcwd()))
378  self.link_fix()
379  done=set() # set of my products I already know are delivered
380  gaveup=set() # set of my products that I gave up on producing
381  tries=dict()
382  count=len(self.__domains) # total number of products
383  outdir=self.outdir
384  while len(done)+len(gaveup)<count:
385  message='status: '
386  for domain,wrfproduct,myproduct in self.domprod():
387  what=domain.name
388  if domain in done or myproduct.is_available():
389  done.add(domain)
390  message+='[%s: done] '%(domain.name,)
391  elif not wrfproduct.is_available():
392  if sync_frequently:
393  should_sync=True
394  else:
395  now=int(time.time())
396  shoudsync=now>lastsync+30
397  if should_sync:
398  logger.info('Calling sync...')
399  if sync_frequently:
400  # Run sync on compute node if Cray:
401  runsync()
402  # Run sync on batch node if Cray or on
403  # compute node everywhere else:
404  run(exe('sync'),logger=logger)
405  logger.info('Returned from sync.')
406  if sync_frequently:
407  lastsync=int(time.time())
408  wrfproduct.check()
409 
410  if domain in gaveup:
411  pass
412  elif domain in tries and tries[domain]>=5:
413  gaveup.add(domain)
414  elif wrfproduct.is_available():
415  try:
416  if domain in tries:
417  tries[domain]+=1
418  else:
419  tries[domain]=1
420  self.del_post_output()
421  make_symlink(wrfproduct.location,'./INFILE',
422  force=True,logger=logger)
423  self.make_control(wrfproduct['stream'])
424  wrf_base=os.path.basename(wrfproduct.location)
425  match=re.search(
426  '(\d\d\d\d.\d\d.\d\d.\d\d.\d\d.\d\d)',
427  wrf_base)
428  if match and len(match.groups())>0:
429  #print 'match group = ',repr(match.groups())
430  datestamp=match.groups()[0]
431  else:
432  datestamp=self.__time.strftime(
433  '%Y-%m-%d_%H:%M:%S')
434  # NOTE: raiseall=True should not raise
435  # here since this is a normal,
436  # expected, condition when running the
437  # post on the 1 minute forecast in the
438  # JHWRF_INIT jobs.
439  logger.warning(
440  '%s: %s: cannot get datestamp from this '
441  'name; will guess %s'%\
442  (what,wrf_base,datestamp))
443  # Workaround needed for pnetcdf support:
444  datestamp = datestamp[0:4]+'-'+datestamp[5:7]+\
445  '-'+datestamp[8:10]+'_'+datestamp[11:13]+\
446  ':'+datestamp[14:16]+':'+datestamp[17:19]
447  cmd = mpirun(mpi(self.getexe('post')),
448  allranks=True) > 'vpost.log'
449  logger.info('Post command: %s'%(repr(cmd),))
450  #cmd=exe('false')
451  with open('itag','wt') as itag:
452  itag.write("""INFILE
453 netcdf
454 %s
455 NMM NEST
456 """ % (datestamp,))
457  run(batchexe('sync'))
458  if self.confbool('sync_frequently',True):
459  runsync()
460  ret = run(cmd)
461  (ok,cenla,cenlo,filename) = self.check_post(
462  ret,what)
463  if ok:
464  logger.info('%s: deliver'%(what,))
465  myproduct.deliver(myproduct.make_location(
466  outdir),{'CENLA':cenla, 'CENLO':cenlo,
467  'fromloc':filename},logger=logger)
468  done.add(domain)
469  message+='[%s: just posted] '%(domain.name,)
470  elif raiseall:
471  msg='%s: failed'%(what,)
472  logger.warning(msg)
473  raise PostFailed(msg)
474  else:
475  logger.info('%s: failed'%(what,))
476  message+='[%s: post failed %d times] '%(
477  domain.name,tries[domain])
478  except Exception as e:
479  logger.warning('%s: Exception caught in post: %s'%(
480  what,str(e)),exc_info=True)
481  message+='[%s: exception] '%(domain.name,)
482  raise
483  elif wrfproduct.location:
484  message+='[%s (%s) unavailable] '%(
485  str(wrfproduct.location),domain.name)
486  if raiseall:
487  raise PostHasNoInput(
488  "%s: %s: not available (should be at %s but "
489  "available=False)"%(domain.name,wrfproduct.
490  did,wrfproduct.location))
491  else:
492  message+='[%s: unavailable] '%(domain.name,)
493  if raiseall:
494  raise PostHasNoInput(
495  "%s: %s: not available, and location=None"%(
496  domain.name,wrfproduct.did))
497  #print message
498  logger.info(message)
499  if len(done)+len(gaveup)<count:
500  logger.info('Sleep 30...')
501  time.sleep(30)
502  logger.info('Done sleeping.')
503  if nosleep: return
504  else:
505  if len(gaveup)>0:
506  logger.critical('state=FAILED')
507  self.setstate(FAILED)
508  else:
509  logger.info('state=COMPLETED')
510  self.setstate(COMPLETED)
511 
512 ########################################################################
513 
515  """!A wrapper around PostOneWRF that posts many WRF output times."""
516  def __init__(self,wrf,domains,conf,section,step,postclass=PostOneWRF,
517  start=None,end=None,streams=['history'],
518  needcrtm=True,grib=1,taskname=None,**kwargs):
519  """!PostManyWRF constructor
520 
521  @param wrf the hwrf.fcsttask.WRFTaskBase or subclass, whose
522  data is being posted
523  @param domains list of domains to post, as hwrf.wrf.WRFDomain objects
524  @param conf the hwrf.config.HWRFConfig that provides configuration ifnormation
525  @param section the config section in conf
526  @param step time step between post input times
527  @param postclass should be PostOneWRF
528  @param start the first time to post
529  @param end the last time to post
530  @param streams the streams to consider posting
531  @param needcrtm do we need CRTM fix files?
532  @param grib GRIB version: 1 or 2
533  @param taskname the task name in the database
534  @param kwargs additional keyword arguments passed to PostOneWRF.__init__ """
535  super(PostManyWRF,self).__init__(wrf.dstore,wrf.conf,
536  section,taskname=taskname,**kwargs)
537  self._needcrtm=needcrtm
538  self.__wrf=wrf
539  self._subtasks=[]
540  self.__done=set()
541  self._postclass=postclass
542 
543  #print 'process streams'
544  # Get the earliest start time and latest end time of the output:
545  (istart,iend,iinterval)=self.wrf()[domains[0]].get_output_range(
546  streams[0])
547  keepstreams=list()
548  for stream in streams:
549  try:
550  for domain in domains:
551  (jstart,jend,jinterval)=self.wrf()[domain].\
552  get_output_range(stream)
553  if jstart<istart: istart=jstart
554  if jend>iend: iend=jend
555  keepstreams.append(stream)
557  pass
558  streams=keepstreams
559  #print repr(streams)
560  #print istart,iend
561 
562  # Figure out the start and end time for data processing.
563  # We'll use the istart, iend that we just computed for start
564  # and end if they're missing:
565  start = istart if start is None else start
566  end = iend if end is None else iend
567  interval = step
568 
569  # Convert start and end to datetime objects:
570  start=to_datetime_rel(start,istart) # start relative to istart
571  end=to_datetime_rel(end,start) # end is relative to start
572  interval=to_timedelta(interval)
573 
574  #print repr(start),repr(end),repr(interval)
575 
576  self._step=interval
577 
578  # Get the WRF object's version of the domains since it will be
579  # filled with additional information we need about file
580  # locations, etc.:
581  self.__mydomains=[self.wrf()[domain] for domain in domains]
582 
583  # Now generate the subtasks:
584  self._add_subtasks(streams,start,end)
585  def _add_subtasks(self,streams,start,end):
586  """!Generate subtasks
587 
588  Fills the self._subtasks array. It figures out which times
589  have data for all domains and creates a PostOneWRF object for
590  each of those times. The subtasks are created in temporal
591  order.
592  @param streams the list of streams to consider
593  @param start,end start and end of the range of times to consider"""
594  interval=self._step
595  when=start
596  epsilon=to_fraction(interval)/10
597  last=dict()
598  logger=self.log()
599  ende=to_datetime_rel(epsilon,end)
600  while when<ende:
601  this=dict()
602  for stream in streams:
603  logger.debug(' check stream %s'%(stream))
604  ok=False
605  for domain in self.domains():
606  key=(domain,stream)
607  this[key]=domain.get_output(stream,when,logger=self.log())
608  if this[key] is None:
609  continue
610  logger.debug(' domain %s stream %s time %s result %s'% \
611  (repr(domain),repr(stream),repr(when),repr(this[key])))
612  attime=this[key].validtime()
613  dt=abs(to_fraction(attime-when,negok=True))
614  if dt<epsilon:
615  logger.debug(' dt=%s-%s=%s > epsilon=%s'%(
616  repr(attime),repr(when),repr(dt),
617  repr(epsilon)))
618  okstream=stream
619  ok=True
620  break
621  if ok: break
622  if ok:
623  logger.debug('Adding post %s with stream %s time %s'%(
624  self.taskname_for(when),repr(stream),repr(when)))
625  #print 'use stream ',stream,' for time ',repr(when)
626  self._subtasks.append( (when,
627  self._postclass(self.wrftask(),[
628  x for x in self.domains()],self._conf,
629  self._section,when,stream=okstream,
630  needcrtm=self._needcrtm,
631  taskname=self.taskname_for(when)) ) )
632  last=this
633  else:
634  logger.debug('%s: ignoring duplicate output time due to WRF '
635  'output frequency'%(when.strftime(
636  '%Y-%m-%d %H:%M:%S'),))
637  when+=interval
638  self.log().debug('len(self._subtasks)=%s'%(repr(len(self._subtasks))))
639  def domains(self):
640  """!Iterates over all WRFDomain objects."""
641  for x in self.__mydomains:
642  yield x
643  def subtasks(self):
644  """!Iterate over all subtasks
645 
646  Iterator that loops over all subtasks yielding a tuple:
647 
648  @code
649  (itask,rtime,subtask)
650  @endcode
651 
652  Where:
653  * itask = task index from 0 to ntasks-1
654  * rtime = output time this task processes
655  * subtask = the Task object"""
656  for x in xrange(len(self._subtasks)):
657  yield (x,self._subtasks[x][0],self._subtasks[x][1])
658  def unrun(self):
659  """!Calls uncomplete, and then deletes all products."""
660  self.uncomplete()
661  for product in self.products(): product.undeliver()
662  def uncomplete(self):
663  """!Marks this task and all subtasks as incomplete so that all
664  post-processing will be rerun. Does not undeliver any
665  delivered products."""
666  self.state=UNSTARTED
667  for itask,rtime,subtask in self.subtasks():
668  subtask.state=UNSTARTED
669  def taskname_for(self,time):
670  """!Returns a human-readable taskname for the given subtask
671  time.
672  @param time the time of interest"""
673  (ihours,iminutes)=wrf_hr_min(time,self.wrf().simstart())
674  return '%s-f%02dh%02dm'%(self.taskname,ihours,iminutes)
675  def subtask_count(self):
676  """!Returns the number of subtasks."""
677  return len(self._subtasks)
678  def starttime(self):
679  """!Returns the first time to be processed"""
680  return self._subtasks[0][1]
681  def endtime(self):
682  """!Returns the last time to be processed"""
683  return self._subtasks[len(self._subtasks)-1][1]
684  def wrf(self):
685  """!Returns the WRF object being posted"""
686  return self.__wrf.wrf()
687  def wrftask(self):
688  """!Returns the Task that ran WRF"""
689  return self.__wrf
690  def _run_helper(self,one):
691  """!Internal implementation function: this implements run and
692  runone. Do not call directly.
693  @param one True = runone(), False=run()"""
694  logger=self.log()
695  if self.getstate()==COMPLETED:
696  return
697  completed=0
698  failed=0
699  count=self.subtask_count()
700  break_outer=False
701  lockdir=os.path.join(self.getdir('lockdir'),self.taskname)
702  produtil.fileop.makedirs(lockdir)
703  while not break_outer and completed+failed<count:
704  # Check subtasks to se what we can run
705  completed=0 # reset completed task count
706  failed=0 # reset failed task count
707  n_unrunable=0 # number seen so far that are not failed,
708  # and cannot start
709  max_unrunable=5 # number of subtasks to look ahead
710  for (itask,rtime,subtask) in self.subtasks():
711  state=subtask.getstate()
712  if state==COMPLETED:
713  #logger.debug('subtask %s is completed.'%(subtask.taskname,))
714  completed+=1
715  continue
716  elif state==FAILED:
717  #logger.debug('subtask %s is failed.'%(subtask.taskname,))
718  failed+=1
719  continue
720  elif not subtask.can_run():
721  n_unrunable+=1
722  logger.debug('substask %s cannot run yet.'%(
723  subtask.taskname,))
724  if n_unrunable>max_unrunable:
725  if one: break_outer=True
726  break
727  continue
728  lockfile=os.path.join(lockdir,'%s.task%d'%
729  ( rtime.strftime('%Y%m%d.%H%M%S'),itask ))
731  filename=lockfile,max_tries=1)
732  try:
733  with locker:
734  logger.info('run subtask %s'%(subtask.taskname,))
735  subtask.run(nosleep=True)
736  if subtask.is_completed() and one:
737  break_outer=True
738  break
739  except produtil.locking.LockHeld as lh:
740  logger.info('subtask %s: lock held, moving on.'
741  %(subtask.taskname,))
742  except Exception as e:
743  if not self.confbool('ignore_errors',False):
744  logger.error(
745  'aborting: %s raised unexpected exception: %s'%
746  (subtask.taskname,str(e)),exc_info=True)
747  raise
748  else:
749  logger.warning(
750  '%s raised unexpected exception: %s'%
751  (subtask.taskname,str(e)),exc_info=True)
752  if not break_outer and completed+failed<count:
753  logger.info('Sleep 20...')
754  time.sleep(20)
755  logger.info('done sleeping.')
756  if completed==count:
757  self.setstate(COMPLETED)
758  self.postmsg('All %d of %d subtasks completed.'%(completed,count))
759  elif failed==count:
760  self.setstate(FAILED)
761  logger.critical('MULTI-TASK WORKSTREAM FAILED.')
762  def run(self):
763  """!Post all inputs."""
764  self._run_helper(False)
765  def runpart(self):
766  """!Post one input time and return."""
767  self._run_helper(True)
768  def products(self,time=None,**kwargs):
769  """!Iterate over products.
770 
771  @param time Only iterate over this time's products. The time
772  that is actually iterated is the last time that is not before
773  the requested time.
774 
775  @param kwargs passed to the subtask to further limit the
776  products iterated over. Typically, that is PostOneWRF.products()"""
777  if time is None:
778  for (itask,xtime,subtask) in self.subtasks():
779  for product in subtask.products(**kwargs):
780  yield product
781  else:
782  reltime=to_datetime_rel(time,self.wrf().simstart())
783  epsilon=to_fraction(self._step/10)
784  for (itask,xtime,subtask) in self.subtasks():
785  dt=abs(to_fraction(xtime-reltime,negok=True))
786  if xtime>=reltime and dt<epsilon:
787  for product in subtask.products(**kwargs):
788  yield product
789  break
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
Definition: fileop.py:359
This exception is raised when a LockFile cannot lock a file because another process or thread has loc...
Definition: locking.py:55
def link_post_fix
Links or copies all fix files for the post to the current working directory.
Definition: post.py:109
prodname
Read-only property, an alias for getprodname(): the product name part of the database ID...
Definition: datastore.py:535
def del_post_output(self)
Deletes any post output files from the current working directory.
Definition: post.py:337
A wrapper around PostOneWRF that posts many WRF output times.
Definition: post.py:514
def _add_subtasks(self, streams, start, end)
Generate subtasks.
Definition: post.py:585
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
def make_location(self, outdir)
Decides a filename which is of the format "outdir/category/prodname".
Definition: post.py:183
wrf_hr_min
References a function that converts forecast and analysis times to the number of hours and minutes be...
Definition: post.py:152
def wrf(self)
Returns the WRF object being posted.
Definition: post.py:684
def getnscenter(self)
The domain center latitude.
Definition: post.py:197
Handles file locking using Python "with" blocks.
Definition: locking.py:1
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
def _run_helper(self, one)
Internal implementation function: this implements run and runone.
Definition: post.py:690
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
Raised upon errors that would cause a retry, in the PostOneWRF.run when passed the raiseall=True argu...
Definition: exceptions.py:416
def remove_file
Deletes the specified file.
Definition: fileop.py:251
def uncomplete(self)
Marks this task and all subtasks as incomplete so that all post-processing will be rerun...
Definition: post.py:662
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
def products(self, time=None, kwargs)
Iterate over products.
Definition: post.py:768
def unrun(self)
Calls uncomplete, and then deletes all products.
Definition: post.py:658
def check_post(retval, what, logger)
Did the post run successfully in the current working directory?
Definition: post.py:29
def deliver
Copies the file to its destination, and sets the CENLA and CENLO metadata values to the domain center...
Definition: post.py:158
def update(self)
Discards all cached metadata and refreshes it from the Datastore.
Definition: datastore.py:614
This is an HWRFTask that post-processes output data for a single WRF stream, from several WRF domains...
Definition: post.py:212
def product_name(self, domain)
Returns a human readable string representation of the product name for the given domain.
Definition: post.py:262
def confbool
Alias for self.conf.getbool for section self.section.
Definition: hwrftask.py:287
def __init__(self, wrf, domains, conf, section, time, stream='history', needcrtm=True, grib=1, faketime=None, taskname=None, kwargs)
PostOneWRF constructor.
Definition: post.py:216
def domains(self)
Iterates over all domains that will be processed.
Definition: post.py:325
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
Definition: hwrftask.py:396
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
def endtime(self)
Returns the last time to be processed.
Definition: post.py:681
def runpart(self)
Post one input time and return.
Definition: post.py:765
This subclass of GRIB1Op and UpstreamFile represents a GRIB1 file that is produced by an upstream wor...
Definition: regrib.py:1848
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Definition: fileop.py:333
def taskname_for(self, time)
Returns a human-readable taskname for the given subtask time.
Definition: post.py:669
def wrftask(self)
Returns the Task that ran WRF.
Definition: post.py:687
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
location
Read-write property, an alias for getlocation() and setlocation().
Definition: datastore.py:563
def makedirs
Make a directory tree, working around filesystem bugs.
Definition: fileop.py:224
def setstate(self, val)
Sets the state of this job.
Definition: datastore.py:1094
category
Read-only property, an alias for getcategory(), the category name part of the database ID...
Definition: datastore.py:540
Time manipulation and other numerical routines.
Definition: numerics.py:1
Raised when attempting to obtain information about when a WRF stream outputs, if the stream is disabl...
Definition: exceptions.py:326
def make_control(self, stream)
Makes the post control file for the specified stream as the file fort.14 in the local directory...
Definition: post.py:296
This class is intended to be used with the Python "with TempDir() as t" syntax.
Definition: cd.py:38
def confget
Alias for self.conf.get for section self.section.
Definition: hwrftask.py:300
def check_post(self, retval, what)
Calls check_post() to see if the post succeeded.
Definition: post.py:344
def log
Obtain a logging domain.
Definition: hwrftask.py:425
This subclass of GRIB1Op represents a GRIB1 file on disk that is ALREADY PRESENT. ...
Definition: regrib.py:1667
def set_loc_avail(self, loc, avail)
Sets the location and availability of this Datum in a single transaction.
Definition: datastore.py:583
def run
Runs the post for one forecast time and all WRF domains.
Definition: post.py:364
def subtask_count(self)
Returns the number of subtasks.
Definition: post.py:675
def getstate(self)
Returns the job state.
Definition: datastore.py:1103
def wrf(self)
Returns the WRFSimulation object.
Definition: post.py:274
Represents an E grid WRF-NMM GRIB1 file, and stores two metadata values: CENLA and CENLO which contai...
Definition: post.py:154
def __init__(self, wrf, domains, conf, section, step, postclass=PostOneWRF, start=None, end=None, streams=['history'], needcrtm=True, grib=1, taskname=None, kwargs)
PostManyWRF constructor.
Definition: post.py:518
def run(self)
Post all inputs.
Definition: post.py:762
def wrftask(self)
Returns the Task that represents the running WRF simulation.
Definition: post.py:277
def getewcenter(self)
The domain center longitude.
Definition: post.py:200
Automates locking of a lockfile.
Definition: locking.py:66
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def link_fix(self)
Calls link_post_fix() to link fix files.
Definition: post.py:315
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
Definition: datastore.py:1084
Describes regribbing operations using an algebraic structure.
Definition: regrib.py:1
def call_callbacks
Calls all delivery callback functions.
Definition: datastore.py:759
def domprod(self)
Iterates over wrf domains and products.
Definition: post.py:328
def requested_time(self)
Returns the forecast time that is being processed.
Definition: post.py:322
def domains(self)
Iterates over all WRFDomain objects.
Definition: post.py:639
def subtasks(self)
Iterate over all subtasks.
Definition: post.py:643
def can_run(self)
Returns True if the needed inputs are available for the post, and False if they are not...
Definition: post.py:350
def starttime(self)
Returns the first time to be processed.
Definition: post.py:678
def make(self, regrib, args, kwargs)
Delivers the file.
Definition: post.py:188
def products(self, args, kwargs)
Iterates over products.
Definition: post.py:280