1 """!Runs the Unified Post Processor on outputs from the WRF-NMM,
2 producing E grid GRIB files as EGRIB1Product objects. """
6 __all__=[
'PostOneWRF',
'PostManyWRF',
'EGRIB1Product',
'check_post',
9 import glob,os,os.path,stat,time,math, re
30 """!Did the post run successfully in the current working directory?
32 Checks the current working directory and the specified return
33 value retval from the post to determine if the post succeeded.
34 Returns a four-element tuple (ok,cenla,cenlo,filename) where "ok"
35 is True if the post succeded, (cenla,cenlo) is the domain center
36 and filename is the name of the post output file.
37 @param retval the post return value
38 @param what for log messages: what was posted?
39 @param logger the logging.Logger for log messages"""
44 logger.warning(
'%s:Non-zero exit status %d from post.'%(what,retval))
46 with open(
'vpost.log',
'rb')
as f:
49 readsize=min(102400,filesize)
50 f.seek(-readsize,os.SEEK_END)
51 vpostdat=f.read(readsize)
52 if(-1 == vpostdat.find(
'ALL GRIDS PROCESSED')):
54 '%s: Did not find "ALL GRIDS PROCESSED" in last %d '
55 'bytes of vpost.log (file size %d). Post probably '
56 'failed'%(what,readsize,filesize))
57 lins=vpostdat.splitlines()
58 for lin
in lins[-50:]:
59 logger.warning(
'vpost.log: %s'%(lin,))
60 return (
False,
None,
None,
None)
61 if os.path.exists(
'vpost.log'):
62 with open(
'vpost.log',
'rt')
as f:
72 '%s: this-domain-center.ksh.inc is empty or does not exist. WRF failed.'%(what,))
73 run(exe(
'ls')[
'-ltr'])
74 run(exe(
'tail')[
'-20',
'vpost.log'])
75 return(
False,
None,
None,
None)
76 with open(
'this-domain-center.ksh.inc',
'rt')
as f:
77 for line
in f.readlines(102400):
79 m=re.search(
'(clat|clon)\s*=\s*([+-]?[.0-9]+)',line)
81 (varr,where)=m.groups()
82 if varr==
'clat': cenla=float(where)
83 if varr==
'clon': cenlo=float(where)
84 except(KeyError,ValueError,TypeError,AttributeError)
as e:
88 '%s: Exception while parsing this-domain-'
89 'center.ksh.inc'%(what,),exc_info=
True)
91 logger.warning(
'%s: Could not get clat from this-domain-'
92 'center.ksh.in'%(what,))
94 logger.warning(
'%s: Could not get clon from this-domain-'
95 'center.ksh.in'%(what,))
96 ok=cenla
is not None and cenlo
is not None
98 for filename
in glob.glob(
'WRFPRS*'):
99 logger.info(
'%s: success: cenla=%s cenlo=%s file=%s.'
100 %(what,cenla,cenlo,filename))
101 return (ok,cenla,cenlo,filename)
102 logger.warning(
'%s: No WRFPRS* files found.'%(what,))
103 run(exe(
'ls')[
'-ltr'])
104 run(exe(
'tail')[
'-20',
'vpost.log'])
105 return(
False,
None,
None,
None)
110 """!Links or copies all fix files for the post to the current
112 @param fixd the fix directory
113 @param needcrtm flag: is the CRTM data needed?
114 @param logger the logging.Logger for log messages"""
116 if logger
is not None:
117 logger.info(
'%s: link post fix files here. Needcrtm=%s'%(
118 str(os.getcwd()),repr(needcrtm)))
121 for tgt
in [
'eta_micro_lookup.dat',
'nam_micro_lookup.dat',
122 'hires_micro_lookup.dat' ]:
127 src=os.path.join(fixd,
'hwrf-wrf',
'ETAMPNEW_DATA.expanded_rain')
129 preserve_perms=
False)
133 crtmd=os.path.join(fixd,
'hwrf-crtm-2.0.6')
135 for src
in (
"amsre_aqua",
"imgr_g11",
"imgr_g12",
"imgr_g13",
136 "imgr_g15",
"imgr_mt1r",
"imgr_mt2",
"seviri_m10",
137 "ssmi_f13",
"ssmi_f14",
"ssmi_f15",
"ssmis_f16",
138 "ssmis_f17",
"ssmis_f18",
"ssmis_f19",
"ssmis_f20",
139 "tmi_trmm",
"v.seviri_m10",
"imgr_insat3d" ):
140 links.append(
'%s/SpcCoeff/Big_Endian/%s.SpcCoeff.bin'%(crtmd,src))
141 links.append(
'%s/TauCoeff/Big_Endian/%s.TauCoeff.bin'%(crtmd,src))
142 for src
in (
'Aerosol',
'Emis',
'Cloud' ):
143 links.append(
'%s/%sCoeff/Big_Endian/%sCoeff.bin'%(crtmd,src,src))
144 make_symlinks_in(links,
'.',logger=logger,force=
True,
152 wrf_hr_min = fcst_hr_min
155 """!Represents an E grid WRF-NMM GRIB1 file, and stores two
156 metadata values: CENLA and CENLO which contain the domain center
158 def deliver(self,location,fileinfo,logger=None):
159 """!Copies the file to its destination, and sets the CENLA and
160 CENLO metadata values to the domain center.
161 @param location the destination location
162 @param fileinfo a dict containing fromloc, the location;
163 CENLA, the domain center latitude; and CENLO, the domain
165 @param logger a logging.Logger for log messages"""
166 assert(location
is not None)
167 dirname=os.path.dirname(location)
168 if not os.path.exists(dirname):
170 deliver_file(fileinfo[
'fromloc'],location,keep=
False,logger=logger)
171 cenla=fileinfo[
'CENLA']
172 cenlo=fileinfo[
'CENLO']
173 assert(cenla
is not None)
174 assert(cenlo
is not None)
175 with self.dstore.transaction()
as t:
180 assert(
'CENLA' in self)
181 assert(
'CENLO' in self)
184 """!Decides a filename which is of the format
185 "outdir/category/prodname".
186 @param outdir the output directory"""
188 def make(self,regrib,*args,**kwargs):
189 """!Delivers the file.
190 @returns a hwrf.regrib.GRIB1File for the file.
191 @param regrib the hwrf.regrib.Regrib with input information
192 @param args,kwargs ignored"""
194 (filename,index)=regrib.gribtemp(
'prod.'+os.path.basename(loc))
196 return GRIB1File(filename,
None,
None,self[
'CENLA'],self[
'CENLO'])
198 """!The domain center latitude."""
201 """!The domain center longitude."""
204 nscenter=property(getnscenter,
None,
None,
205 'Returns None or the center latitude of this GRIB1 file.')
207 ewcenter=property(getewcenter,
None,
None,
208 'Returns None or the center longitude of this GRIB1 file.')
213 """!This is an HWRFTask that post-processes output data for a
214 single WRF stream, from several WRF domains at a a given time."""
215 def __init__(self,wrf,domains,conf,section,time,stream='history',
216 needcrtm=
True,grib=1,faketime=
None,taskname=
None,**kwargs):
217 """!PostOneWRF constructor
218 @param wrf the hwrf.fcsttask.WRFTaskBase or subclass, whose
220 @param domains list of domains to post, as hwrf.wrf.WRFDomain objects
221 @param conf the hwrf.config.HWRFConfig that provides configuration ifnormation
222 @param section the config section in conf
223 @param time the forecast time being posted
224 @param stream the WRF stream to post
225 @param needcrtm do we need CRTM fix files?
226 @param grib GRIB version: 1 or 2
227 @param faketime unused
228 @param taskname the task name in the database
229 @param kwargs additional keyword arguments passed to hwrf.hwrftask.HWRFTask.__init__"""
230 super(PostOneWRF,self).
__init__(wrf.dstore,wrf.conf,section,
231 taskname=taskname,**kwargs)
234 self.
__time=to_datetime_rel(time,wrf.wrf().simstart())
241 prodext = (
'egrb2' if(grib==2)
else 'egrb' )
243 raise NotImplementedError(
244 'GRIB2 support not yet implemented in hwrf.post')
248 assert(stream
is not None)
249 assert(time
is not None)
250 for product
in wrf.products(domains=[domain],stream=stream,
256 self.
log().debug(
'%s added %s => %s'%(self.
taskname,
257 repr(domain),repr(product)))
263 """!Returns a human readable string representation of the
264 product name for the given domain. This is used for filenames
266 @param domain the domain of interes"""
267 ext = (
'egrb2' if(self.
__grib2)
else 'egrb')
271 result=
'%s-%s.%s'%(self.
taskname,domain.name,ext)
272 result=re.sub(
'[^a-zA-Z0-9_.-]',
'_',result)
275 """!Returns the WRFSimulation object."""
276 return self.__wrf.wrf()
278 """!Returns the Task that represents the running WRF simulation."""
281 """!Iterates over products.
283 If the domain keyword is in kwargs, then only the specified
284 domain is iterated. If the domains keyword is in kwargs, then
285 that list of domains is iterated.
286 @param args,kwargs argument list"""
287 if 'domains' in kwargs:
288 for domain
in kwargs[
'domains']:
291 elif 'domain' in kwargs
and kwargs[
'domain']
in self.
__myproducts:
297 """!Makes the post control file for the specified stream as
298 the file fort.14 in the local directory
300 Figure out what control file to use from the conf section for
301 this task. We look for a stream-specific one first (ie.:
302 "auxhist2_control"). If that is missing we use a default
303 control file from the "control" variable. The control file is
304 then copied to fort.14.
305 @bug this function should use deliver_file with logging"""
306 Missing=
'**MISSING**'
308 control=self.
confget(
'%s_control'%(stream,),Missing)
309 if control
is Missing: control=self.
confstr(
'control')
310 self.
log().debug(
'Use control file %s'%(control,))
311 if os.path.exists(
'./fort.14'):
316 """!Calls link_post_fix() to link fix files."""
317 fixd=self.
getdir(
'FIXhwrf')
319 link_files=self.
confbool(
'link_wrf_fix',
True)
323 """!Returns the forecast time that is being processed."""
326 """!Iterates over all domains that will be processed."""
327 for domain
in self.
__domains:
yield domain
329 """!Iterates over wrf domains and products
331 Iterates over tuples (domain,wrfproduct,myproduct) where
332 domain is the WRF domain, wrfproduct is the WRF output Product
333 from that domain for the chosen time and myproduct is the
334 output EGRIB1Product for that time."""
338 """!Deletes any post output files from the current working
340 for filename
in glob.glob(
'./WRFPRS*'):
342 if os.path.exists(
'this-domain-center.ksh.inc'):
343 os.unlink(
'this-domain-center.ksh.inc')
345 """!Calls check_post() to see if the post succeeded
346 @param retval the post return value
347 @param what String description of what the post was run on (for log messages)
348 @returns True if the post succeeded, False otherwise"""
351 """!Returns True if the needed inputs are available for the
352 post, and False if they are not."""
353 for domain,wrfprod,myprod
in self.
domprod():
354 if myprod.available:
continue
355 if not wrfprod.available:
357 if not wrfprod.available:
359 '%s: cannot run: %s not available (loc=%s avail=%s)'%(
361 repr(wrfprod.location), repr(wrfprod.available)))
364 def run(self, nosleep=False, raiseall=False):
365 """!Runs the post for one forecast time and all WRF domains.
366 @param nosleep If True, disable sleep calls.
367 @param raiseall If True, do not catch exceptions."""
370 sync_frequently=self.
confbool(
'sync_frequently',
True)
372 if state
is COMPLETED
or state
is FAILED:
375 suffix=
'.work',logger=self.
log())
as tempdir:
376 self.
log().info(
'cwd: '+os.getcwd())
377 assert(
not re.match(
'\A/tmp',os.getcwd()))
384 while len(done)+len(gaveup)<count:
386 for domain,wrfproduct,myproduct
in self.
domprod():
388 if domain
in done
or myproduct.is_available():
390 message+=
'[%s: done] '%(domain.name,)
391 elif not wrfproduct.is_available():
396 shoudsync=now>lastsync+30
398 logger.info(
'Calling sync...')
404 run(exe(
'sync'),logger=logger)
405 logger.info(
'Returned from sync.')
407 lastsync=int(time.time())
412 elif domain
in tries
and tries[domain]>=5:
414 elif wrfproduct.is_available():
421 make_symlink(wrfproduct.location,
'./INFILE',
422 force=
True,logger=logger)
424 wrf_base=os.path.basename(wrfproduct.location)
426 '(\d\d\d\d.\d\d.\d\d.\d\d.\d\d.\d\d)',
428 if match
and len(match.groups())>0:
430 datestamp=match.groups()[0]
432 datestamp=self.__time.strftime(
440 '%s: %s: cannot get datestamp from this '
441 'name; will guess %s'%\
442 (what,wrf_base,datestamp))
444 datestamp = datestamp[0:4]+
'-'+datestamp[5:7]+\
445 '-'+datestamp[8:10]+
'_'+datestamp[11:13]+\
446 ':'+datestamp[14:16]+
':'+datestamp[17:19]
447 cmd = mpirun(mpi(self.
getexe(
'post')),
448 allranks=
True) >
'vpost.log'
449 logger.info(
'Post command: %s'%(repr(cmd),))
451 with open(
'itag',
'wt')
as itag:
457 run(batchexe(
'sync'))
458 if self.
confbool(
'sync_frequently',
True):
464 logger.info(
'%s: deliver'%(what,))
465 myproduct.deliver(myproduct.make_location(
466 outdir),{
'CENLA':cenla,
'CENLO':cenlo,
467 'fromloc':filename},logger=logger)
469 message+=
'[%s: just posted] '%(domain.name,)
471 msg=
'%s: failed'%(what,)
475 logger.info(
'%s: failed'%(what,))
476 message+=
'[%s: post failed %d times] '%(
477 domain.name,tries[domain])
478 except Exception
as e:
479 logger.warning(
'%s: Exception caught in post: %s'%(
480 what,str(e)),exc_info=
True)
481 message+=
'[%s: exception] '%(domain.name,)
483 elif wrfproduct.location:
484 message+=
'[%s (%s) unavailable] '%(
485 str(wrfproduct.location),domain.name)
487 raise PostHasNoInput(
488 "%s: %s: not available (should be at %s but "
489 "available=False)"%(domain.name,wrfproduct.
490 did,wrfproduct.location))
492 message+=
'[%s: unavailable] '%(domain.name,)
494 raise PostHasNoInput(
495 "%s: %s: not available, and location=None"%(
496 domain.name,wrfproduct.did))
499 if len(done)+len(gaveup)<count:
500 logger.info(
'Sleep 30...')
502 logger.info(
'Done sleeping.')
506 logger.critical(
'state=FAILED')
509 logger.info(
'state=COMPLETED')
515 """!A wrapper around PostOneWRF that posts many WRF output times."""
516 def __init__(self,wrf,domains,conf,section,step,postclass=PostOneWRF,
517 start=
None,end=
None,streams=[
'history'],
518 needcrtm=
True,grib=1,taskname=
None,**kwargs):
519 """!PostManyWRF constructor
521 @param wrf the hwrf.fcsttask.WRFTaskBase or subclass, whose
523 @param domains list of domains to post, as hwrf.wrf.WRFDomain objects
524 @param conf the hwrf.config.HWRFConfig that provides configuration ifnormation
525 @param section the config section in conf
526 @param step time step between post input times
527 @param postclass should be PostOneWRF
528 @param start the first time to post
529 @param end the last time to post
530 @param streams the streams to consider posting
531 @param needcrtm do we need CRTM fix files?
532 @param grib GRIB version: 1 or 2
533 @param taskname the task name in the database
534 @param kwargs additional keyword arguments passed to PostOneWRF.__init__ """
535 super(PostManyWRF,self).
__init__(wrf.dstore,wrf.conf,
536 section,taskname=taskname,**kwargs)
545 (istart,iend,iinterval)=self.
wrf()[domains[0]].get_output_range(
548 for stream
in streams:
550 for domain
in domains:
551 (jstart,jend,jinterval)=self.
wrf()[domain].\
552 get_output_range(stream)
553 if jstart<istart: istart=jstart
554 if jend>iend: iend=jend
555 keepstreams.append(stream)
565 start = istart
if start
is None else start
566 end = iend
if end
is None else iend
570 start=to_datetime_rel(start,istart)
571 end=to_datetime_rel(end,start)
572 interval=to_timedelta(interval)
585 def _add_subtasks(self,streams,start,end):
586 """!Generate subtasks
588 Fills the self._subtasks array. It figures out which times
589 have data for all domains and creates a PostOneWRF object for
590 each of those times. The subtasks are created in temporal
592 @param streams the list of streams to consider
593 @param start,end start and end of the range of times to consider"""
596 epsilon=to_fraction(interval)/10
599 ende=to_datetime_rel(epsilon,end)
602 for stream
in streams:
603 logger.debug(
' check stream %s'%(stream))
607 this[key]=domain.get_output(stream,when,logger=self.
log())
608 if this[key]
is None:
610 logger.debug(
' domain %s stream %s time %s result %s'% \
611 (repr(domain),repr(stream),repr(when),repr(this[key])))
612 attime=this[key].validtime()
613 dt=abs(to_fraction(attime-when,negok=
True))
615 logger.debug(
' dt=%s-%s=%s > epsilon=%s'%(
616 repr(attime),repr(when),repr(dt),
623 logger.debug(
'Adding post %s with stream %s time %s'%(
626 self._subtasks.append( (when,
634 logger.debug(
'%s: ignoring duplicate output time due to WRF '
635 'output frequency'%(when.strftime(
636 '%Y-%m-%d %H:%M:%S'),))
638 self.
log().debug(
'len(self._subtasks)=%s'%(repr(len(self.
_subtasks))))
640 """!Iterates over all WRFDomain objects."""
644 """!Iterate over all subtasks
646 Iterator that loops over all subtasks yielding a tuple:
649 (itask,rtime,subtask)
653 * itask = task index from 0 to ntasks-1
654 * rtime = output time this task processes
655 * subtask = the Task object"""
659 """!Calls uncomplete, and then deletes all products."""
661 for product
in self.
products(): product.undeliver()
663 """!Marks this task and all subtasks as incomplete so that all
664 post-processing will be rerun. Does not undeliver any
665 delivered products."""
667 for itask,rtime,subtask
in self.
subtasks():
668 subtask.state=UNSTARTED
670 """!Returns a human-readable taskname for the given subtask
672 @param time the time of interest"""
674 return '%s-f%02dh%02dm'%(self.
taskname,ihours,iminutes)
676 """!Returns the number of subtasks."""
679 """!Returns the first time to be processed"""
682 """!Returns the last time to be processed"""
685 """!Returns the WRF object being posted"""
686 return self.__wrf.wrf()
688 """!Returns the Task that ran WRF"""
690 def _run_helper(self,one):
691 """!Internal implementation function: this implements run and
692 runone. Do not call directly.
693 @param one True = runone(), False=run()"""
703 while not break_outer
and completed+failed<count:
710 for (itask,rtime,subtask)
in self.
subtasks():
711 state=subtask.getstate()
720 elif not subtask.can_run():
722 logger.debug(
'substask %s cannot run yet.'%(
724 if n_unrunable>max_unrunable:
725 if one: break_outer=
True
728 lockfile=os.path.join(lockdir,
'%s.task%d'%
729 ( rtime.strftime(
'%Y%m%d.%H%M%S'),itask ))
731 filename=lockfile,max_tries=1)
734 logger.info(
'run subtask %s'%(subtask.taskname,))
735 subtask.run(nosleep=
True)
736 if subtask.is_completed()
and one:
740 logger.info(
'subtask %s: lock held, moving on.'
741 %(subtask.taskname,))
742 except Exception
as e:
743 if not self.
confbool(
'ignore_errors',
False):
745 'aborting: %s raised unexpected exception: %s'%
746 (subtask.taskname,str(e)),exc_info=
True)
750 '%s raised unexpected exception: %s'%
751 (subtask.taskname,str(e)),exc_info=
True)
752 if not break_outer
and completed+failed<count:
753 logger.info(
'Sleep 20...')
755 logger.info(
'done sleeping.')
758 self.
postmsg(
'All %d of %d subtasks completed.'%(completed,count))
761 logger.critical(
'MULTI-TASK WORKSTREAM FAILED.')
763 """!Post all inputs."""
766 """!Post one input time and return."""
769 """!Iterate over products.
771 @param time Only iterate over this time's products. The time
772 that is actually iterated is the last time that is not before
775 @param kwargs passed to the subtask to further limit the
776 products iterated over. Typically, that is PostOneWRF.products()"""
778 for (itask,xtime,subtask)
in self.
subtasks():
779 for product
in subtask.products(**kwargs):
782 reltime=to_datetime_rel(time,self.
wrf().simstart())
783 epsilon=to_fraction(self.
_step/10)
784 for (itask,xtime,subtask)
in self.
subtasks():
785 dt=abs(to_fraction(xtime-reltime,negok=
True))
786 if xtime>=reltime
and dt<epsilon:
787 for product
in subtask.products(**kwargs):
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
This exception is raised when a LockFile cannot lock a file because another process or thread has loc...
def link_post_fix
Links or copies all fix files for the post to the current working directory.
prodname
Read-only property, an alias for getprodname(): the product name part of the database ID...
def del_post_output(self)
Deletes any post output files from the current working directory.
A wrapper around PostOneWRF that posts many WRF output times.
def _add_subtasks(self, streams, start, end)
Generate subtasks.
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
def make_location(self, outdir)
Decides a filename which is of the format "outdir/category/prodname".
wrf_hr_min
References a function that converts forecast and analysis times to the number of hours and minutes be...
def wrf(self)
Returns the WRF object being posted.
def getnscenter(self)
The domain center latitude.
Handles file locking using Python "with" blocks.
taskname
Read-only property: the name of this task.
def _run_helper(self, one)
Internal implementation function: this implements run and runone.
The base class of tasks run by the HWRF system.
Raised upon errors that would cause a retry, in the PostOneWRF.run when passed the raiseall=True argu...
def remove_file
Deletes the specified file.
def uncomplete(self)
Marks this task and all subtasks as incomplete so that all post-processing will be rerun...
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
def products(self, time=None, kwargs)
Iterate over products.
def unrun(self)
Calls uncomplete, and then deletes all products.
def check_post(retval, what, logger)
Did the post run successfully in the current working directory?
def deliver
Copies the file to its destination, and sets the CENLA and CENLO metadata values to the domain center...
def update(self)
Discards all cached metadata and refreshes it from the Datastore.
This is an HWRFTask that post-processes output data for a single WRF stream, from several WRF domains...
def product_name(self, domain)
Returns a human readable string representation of the product name for the given domain.
def confbool
Alias for self.conf.getbool for section self.section.
def __init__(self, wrf, domains, conf, section, time, stream='history', needcrtm=True, grib=1, faketime=None, taskname=None, kwargs)
PostOneWRF constructor.
def domains(self)
Iterates over all domains that will be processed.
Base class of tasks run by HWRF.
A shell-like syntax for running serial, MPI and OpenMP programs.
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
outdir
The directory in which this task should deliver its final output.
def endtime(self)
Returns the last time to be processed.
def runpart(self)
Post one input time and return.
This subclass of GRIB1Op and UpstreamFile represents a GRIB1 file that is produced by an upstream wor...
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
def taskname_for(self, time)
Returns a human-readable taskname for the given subtask time.
def wrftask(self)
Returns the Task that ran WRF.
Stores products and tasks in an sqlite3 database file.
location
Read-write property, an alias for getlocation() and setlocation().
def makedirs
Make a directory tree, working around filesystem bugs.
def setstate(self, val)
Sets the state of this job.
category
Read-only property, an alias for getcategory(), the category name part of the database ID...
Time manipulation and other numerical routines.
Raised when attempting to obtain information about when a WRF stream outputs, if the stream is disabl...
def make_control(self, stream)
Makes the post control file for the specified stream as the file fort.14 in the local directory...
This class is intended to be used with the Python "with TempDir() as t" syntax.
def confget
Alias for self.conf.get for section self.section.
def check_post(self, retval, what)
Calls check_post() to see if the post succeeded.
def log
Obtain a logging domain.
This subclass of GRIB1Op represents a GRIB1 file on disk that is ALREADY PRESENT. ...
def set_loc_avail(self, loc, avail)
Sets the location and availability of this Datum in a single transaction.
def run
Runs the post for one forecast time and all WRF domains.
def subtask_count(self)
Returns the number of subtasks.
def getstate(self)
Returns the job state.
def wrf(self)
Returns the WRFSimulation object.
Represents an E grid WRF-NMM GRIB1 file, and stores two metadata values: CENLA and CENLO which contai...
def __init__(self, wrf, domains, conf, section, step, postclass=PostOneWRF, start=None, end=None, streams=['history'], needcrtm=True, grib=1, taskname=None, kwargs)
PostManyWRF constructor.
def run(self)
Post all inputs.
def wrftask(self)
Returns the Task that represents the running WRF simulation.
def getewcenter(self)
The domain center longitude.
Automates locking of a lockfile.
Exceptions raised by the hwrf package.
def link_fix(self)
Calls link_post_fix() to link fix files.
def confstr
Alias for self.conf.getstr for section self.section.
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
Describes regribbing operations using an algebraic structure.
def call_callbacks
Calls all delivery callback functions.
def domprod(self)
Iterates over wrf domains and products.
def requested_time(self)
Returns the forecast time that is being processed.
def domains(self)
Iterates over all WRFDomain objects.
def subtasks(self)
Iterate over all subtasks.
def can_run(self)
Returns True if the needed inputs are available for the post, and False if they are not...
def starttime(self)
Returns the first time to be processed.
def make(self, regrib, args, kwargs)
Delivers the file.
def products(self, args, kwargs)
Iterates over products.