1 """!This module contains Tasks to run the WRF Preprocessing System 
    2 (WPS): Geogrid, Ungrib and Metgrid.""" 
    8 import os, shutil, collections, glob, time, math, re, itertools
 
    9 import string, urlparse, datetime, collections
 
   19 from hwrf.exceptions    import WPSError, UngribSubsetError, GeogridNoOutput, \
 
   20     TimestepTooShort, GeogridNoLog, UngribNoInput, UngribInputUnknown
 
   21 from hwrf.numerics      import partial_ordering, TimeArray, to_timedelta, \
 
   22     to_datetime_rel, to_fraction, to_datetime
 
   27 from collections        
import defaultdict
 
   70                     'opt_geogrid_tbl_path',
 
   81                     'opt_metgrid_tbl_path',
 
   91     """!This subclass of HWRFTask represents a WPS Task.  Multiple WPS 
   92     jobs run in the same work directory.  This class allows the jobs 
   93     to be represented together as a set, with one work directory 
   94     specified at the top level.  This class exists only to reduce code 
   95     duplication by letting Metgrid, Geogrid and Ungrib share a 
   97     def __init__(self, dstore, conf, section, sim, domains, taskname=None, 
 
   98                  wpsdir=
None, starttime=
None, increment=
None, endtime=
None,
 
   99                  parent_atime=
None, geogrid_from=
None, **kwargs):
 
  100         """!Create a new WPSTask. 
  102         @param dstore The produtil.datastore.Datastore for database information 
  103         @param conf the hwrf.config.HWRFConfig for configuration info 
  104         @param section the section to use in conf 
  105         @param sim The "sim" is the 
  106         WRFSimulation for which we are preparing input.   
  107         @param domains The domains must be a list of WRFDomain 
  109         @param taskname  Optional: the name of this Task.  Default: config section name. 
  110         @param wpsdir Optional: the directory in which to run WPS. 
  111              Default: taskname subdirectory of conf.getdir("WORKhwrf") 
  113         @param starttime,endtime Optional: simulation length.  Default: same as sim. 
  114         @param increment Optional: boundary input interval.  Default: get from sim. 
  115         @param parent_atime Optional: analysis time of parent model.  Default: starttime 
  116         @param geogrid_from Optional: a Geogrid object whose output 
  117              should be linked before running this WPS step.  This is 
  118              used when the WPS Ungrib and Metgrid are run more than 
  119              once on the same domain.  For example, one may run 
  120              multiple analysis times or multiple forecast lengths off 
  121              of the same geogrid output. 
  122         @param kwargs Other options are sent to the hwrf.hwrftask.HWRFTask.__init__ constructor. 
  124         if taskname 
is None: taskname=section
 
  125         super(WPSTask,self).
__init__(dstore,conf,section,taskname,**kwargs)
 
  129         self.
_domains=[sim[domain] 
for domain 
in domains]
 
  132         if geogrid_from 
is not None:
 
  133             if not isinstance(geogrid_from,Geogrid):
 
  135                     'The geogrid_from parameter to WPSTask.__init__ must ' 
  136                     'be a Geogrid.  You provided a %s %s.' 
  137                     %(type(geogrid_from).__name__,repr(geogrid_from)))
 
  140         if starttime 
is None:  starttime=sim.simstart()
 
  142         if endtime 
is None: endtime=sim.simend()
 
  143         self.
endtime=to_datetime_rel(endtime,starttime)
 
  144         self.
increment=increment 
if increment 
is not None else 6*3600
 
  146         if parent_atime 
is None:
 
  147             parent_atime=starttime
 
  150             raise TimestepTooShort(
 
  151                 'Geogrid timestep %s is smaller than 300 seconds.' 
  153         self.
log().debug(
'%s times: start=%s end=%s increment=%s'%(
 
  174     def _guess_nocolons(self): 
 
  175         """!Guesses whether we should use colons in filenames.  This 
  176         information is obtained from sim.get_nocolons().  Note that 
  177         this requires re-scanning all input and output io_forms, so 
  178         this function is moderately expensive.""" 
  179         nc=self._sim.get_nocolons()
 
  185     nocolons=property(_guess_nocolons,
None,
None, \
 
  186         """A property that is True if WPS will omit colons from the 
  187         output filenames, and False otherwise.""")
 
  190         """!Guesses the file suffix (.nc, etc.) for the specified stream. 
  192         Tries to guess what io suffix WPS will use for a stream.  If 
  193         the stream is not one of the known streams, this function logs 
  194         a warning and tries to continue, but will likely fail. 
  196         @param stream the stream of interest. 
  198         if(stream==
'metgrid' or stream==
'auxinput1'):
 
  199             return self._sim.get_io_suffix(
'auxinput1')
 
  200         elif(stream==
'geogrid' or stream==
'auxinput2'):
 
  201             return self._sim.get_io_suffix(
'auxinput2')
 
  204                 'caller requested unknown stream "%s" in WPSTask.io_suffix' 
  206             return self._sim.get_io_suffix(
'input')
 
  209         """!The number of WRF domains, and highest domain number.""" 
  210         return self._sim.maxdom()
 
  214     maxdom=property(_maxdom,
None,
None,
"The number of WRF domains.")
 
  216         """!The mother of all domains as a WRFDomain.""" 
  217         return self._sim.get(1)
 
  221     MOAD=property(_MOAD,
None,
None,
"Returns the Mother of All Domains (MOAD)")
 
  224         """!The WRFSimulation object for which we are preparing input.""" 
  228         """!Checks to see if the geogrid MOAD output file is present 
  229         and non-empty in the current working directory.  Raises 
  230         GeogridNoOutput if the output is missing or empty.""" 
  232         filename=
'geo_nmm.d01.%s'%(suffix,)
 
  238         """!Links geogrid output from the specified Geogrid task to the 
  239         current working directory.  Will raise an exception if Geogrid 
  240         has not produced its outputs yet.  This is used when multiple 
  241         Metgrid executions are used with the same Geogrid.  The 
  242         geogrid only needs to be run once since its outputs do not 
  243         change with time, just with domain configuration. 
  245         Specifically, this finds all Product objects in task 
  246         self._geogrid_from whose prodname contains "geo" and links 
  247         from the product's location to its prodname in the current 
  248         working directory.  This logic must match the logic in 
  249         Geogrid.make_products and WPSTask.check_geogrid.""" 
  254             logger.info(
'Not linking geogrid inputs.  I hope you already ' 
  255                         'ran geogrid in this directory.')
 
  258             logger.info(
'Task %s linking geo products from %s task %s' 
  259                         %(self.
taskname,type(fromtask).__name__,
 
  262         for product 
in fromtask.products():
 
  264             name=product.prodname
 
  265             if name.find(
'geo')>=0:
 
  266                 logger.info(did+
': should copy to '+name)
 
  268                 logger.info(did+
': does not contain "geo" so skipping this.')
 
  276                 msg=did+
': product is not available' 
  277             elif loc 
is None or loc==
'':
 
  278                 msg=did+
': product has no location (but is available)' 
  279             elif not os.path.exists(loc):
 
  280                 msg=did+
': file does not exist: '+repr(loc)
 
  282                 msg=did+
': file is empty: '+repr(loc)
 
  288                 loc,name,force=
True,logger=logger)
 
  290         """!Iterates over all output times.""" 
  300     sim=property(getsim,
None,
None,
 
  301                  """Returns the WRF simulation for this WPSTask""")
 
  303         """!Iterates over the domains in this WPSTask""" 
  304         for domain 
in self.
_domains: 
yield domain
 
  306         """!Links all fix files for ungrib to the current working directory. 
  307         @param geog_data if True, also link the geog-data directory 
  308         @param table name of the table file symbolic link""" 
  314                 logger.info(
'Link table file %s to %s'%(tblhere,tbl))
 
  315                 make_symlink(tbl,tblhere,force=
True,logger=logger)
 
  317                 make_symlink(self.
getdir(
'geog_data'),
'geog-data',force=
True,
 
  319         except Exception 
as e:
 
  321             logger.warning(
'cannot link to WPS fix files: '+str(e))
 
  324         """!Generates the self._products data structure used by the 
  325         products method.  Should be called by make_products in 
  336         """!This subroutine should be implemented in subclasses.  It 
  337         should call self.make_product_structure(), and then add 
  340           self._products[domain.moad_ratio()][time]=product 
  342         where the domain is a WRFDomainBase subclass, the time is 
  343         anything accepted by to_datetime_rel's first argument, and the 
  344         product is any subclass of Product.""" 
  346         """!Iterates over all products 
  347         @param domain the hwrf.wrf.WRFDomain 
  349         @note One, both or neither argument can be specified.  All matching 
  350         products are yielded.""" 
  351         if self._products 
is not None:
 
  352             if domain 
is not None and domain.moad_ratio() 
in self._products:
 
  353                 tprod=self._products[domain.moad_ratio()]
 
  355                     for (time,product) 
in tprod.itervalues(): 
 
  357                         assert(localprod 
is not None)
 
  360                     localprod=tprod[time]
 
  361                     assert(localprod 
is not None)
 
  366                 for tprod 
in self._products.itervalues():
 
  367                     for thetime,product 
in tprod.iteritems():
 
  370                                 thetime,time,epsilon):
 
  374                         assert(product 
is not None)
 
  378         """!Returns the namelist.wps contents as a string.""" 
  379         return self.nl.remove_traits().set_sorters(_wps_namelist_order,
 
  382         """!Deletes all delivered products and marks them as 
  384         @param time,domain passed to products() to get the list of products 
  385         @param fromloc Ignored.""" 
  388         prodlist=[p 
for p 
in self.
products(time=time,domain=domain)]
 
  390             p.undeliver(logger=logger)
 
  392                          keep=
True,relink=
False):
 
  393         """!This is called from self.run to deliver all products to the 
  396         @param time,domain The optional time and domain arguments 
  397         are sent to self.products.   
  398         @param fromloc By default, this routine assumes that the file 
  399         to be delivered is in the current working directory with the 
  400         same name as the destination file.  To change that, give a 
  401         lambda function in "fromloc", which converts the destination 
  402         filename (the only argument) to a local filename.   
  403         @param keep The "keep" argument has the same meaning as in 
  404         deliver_file: if False, the file may be moved to the 
  406         @param relink If True, and the file is moved, then a symlink 
  407         will be made from the original file location to the 
  409         link_files=self.
confbool(
'link_wrf_fix',
True)
 
  410         keep=keep 
and link_files
 
  415         if fromloc 
is None: fromloc=os.path.basename
 
  416         prodlist=[p 
for p 
in self.
products(time=time,domain=domain)]
 
  420                 if f 
is None or len(f)<1:
 
  422                     msg=
'%s: product has no location; cannot deliver it'%(
 
  427                 assert(isinstance(fl,basestring)) 
 
  429                 p.deliver(frominfo=fl,keep=keep,logger=logger)
 
  430                 if not keep 
and not os.path.exists(fl):
 
  433                     logger.info(
'%s: file was moved to destination %s' 
  434                                 %(linkfile,destfile))
 
  436                         logger.info(
'%s: relink from delivered file %s' 
  437                                     %(linkfile,destfile))
 
  438                         make_symlink(destfile,linkfile,force=
True,
 
  441                         logger.info(
'%s: not relinking.  File now at %s' 
  442                                     %(linkfile,destfile))
 
  443             except EnvironmentError 
as e:
 
  444                 logger.warning(
'%s: cannot deliver file'%(f,),exc_info=
True)
 
  447             logger.critical(
'some output files were empty or missing; ' 
  449     def _wps_namelist(self):
 
  450         """!Fills the self.nl namelist with correct information.  This 
  451         must be called before make_namelist, and hence it is called 
  452         from the constructor.""" 
  454         siu = self.nl.nl_set_if_unset
 
  455         t = self.nl.trait_get
 
  457         start = self.starttime.strftime(
"%Y-%m-%d_%H:%M:%S")
 
  460         s(
'share', 
'max_dom', maxdom)
 
  461         s(
'share', 
'start_date', start)
 
  462         s(
'share', 
'end_date', end)
 
  463         s(
'share', 
'interval_seconds', int(self._conf.get(
'wrf',
'bdystep')))
 
  464         io_form_geogrid=int(self.sim.io_form_for(
'auxinput2'))%100
 
  465         io_form_metgrid=int(self.sim.io_form_for(
'auxinput1'))%100
 
  466         if io_form_metgrid==11: io_form_metgrid=2
 
  467         if io_form_geogrid==11: io_form_geogrid=2
 
  468         s(
'share', 
'io_form_geogrid', io_form_geogrid)
 
  469         s(
'metgrid', 
'io_form_metgrid', io_form_metgrid)
 
  483         resolution = 
'10m' if (maxdom<3) 
else '2m' 
  489                 dx.append(d.nl.trait_get(
'dx'))
 
  490                 dy.append(d.nl.trait_get(
'dy'))
 
  494                 pid.append(d.parent.get_grid_id())
 
  495                 pgr.append(d.nl.trait_get(
'parent_grid_ratio'))
 
  496                 start=str(d.nl.trait_get(
'start',
'auto')).lower()
 
  498                     istart.append(int(d.nl.trait_get(
'istart')))
 
  499                     jstart.append(int(d.nl.trait_get(
'jstart')))
 
  504             gid.append(d.get_grid_id())
 
  505             nx.append(d.nl.trait_get(
'nx'))
 
  506             ny.append(d.nl.trait_get(
'ny'))
 
  507             res.append(resolution)
 
  512         s  (
'geogrid', 
'parent_id', pid)
 
  513         s  (
'geogrid', 
'parent_grid_ratio', pgr)
 
  514         s  (
'geogrid', 
'e_we', nx)
 
  515         s  (
'geogrid', 
'e_sn', ny)
 
  516         s  (
'geogrid', 
'dx', dx)
 
  517         s  (
'geogrid', 
'dy', dy)
 
  518         s  (
'geogrid', 
'i_parent_start', istart)
 
  519         siu(
'geogrid', 
'geog_data_res', res)
 
  520         siu(
'geogrid', 
'map_proj',
'rotated_ll')
 
  521         s  (
'geogrid', 
'geog_data_path', 
'./geog-data/')
 
  522         s  (
'geogrid', 
'opt_geogrid_tbl_path',
'./')
 
  523         s  (
'geogrid', 
'j_parent_start', jstart)
 
  524         s  (
'geogrid', 
'ref_lat', self.
conffloat(
'domlat',section=
'config'))
 
  525         s  (
'geogrid', 
'ref_lon', self.
conffloat(
'domlon',section=
'config'))
 
  527         s  (
'metgrid', 
'opt_metgrid_tbl_path',
'./')
 
  528         s  (
'metgrid', 
'fg_name',
'FILE')
 
  530         s  (
'ungrib',  
'prefix',
'FILE')
 
  531         siu(
'ungrib',  
'out_format',
'WPS')
 
  533         siu(
'share',   
'wrf_core',
'NMM')
 
  537     """!This is a HWRF task that pre processes the geogrid to define the 
  538     model domains and interpolates static geographical data to the 
  542         """!Creates a new Geogrid.   
  543         @param args,kwargs All arguments are passed to the 
  544         WPSTask.__init__() constructor.""" 
  545         super(Geogrid, self).
__init__(*args,**kwargs)
 
  548         """!Creates the FileProduct objects for this Geogrid. 
  550         @note Implementation note: If you change the list of products, 
  551         make sure all geogrid outputs needed as input to ungrib or 
  552         metgrid have a prodname that is the same as the destination 
  560                 f = 
"geo_nmm.d%02d.%s" %(id, suffix)
 
  562                 f = 
"geo_nmm_nest.l%02d.%s" %(id -1, suffix)
 
  563             dest=os.path.join(self.
outdir,f)
 
  565                  meta={
"domain": d.name}, location=dest)
 
  567             self.
log().debug(
'geogrid made product %s with location %s' 
  568                              %(repr(prod.did),repr(prod.location)))
 
  569             self.
_products[d.moad_ratio()][time]=prod
 
  571         """!Returns the FileProduct for the geogrid data for the 
  572         specified nesting ratio.  The specified domain does not have 
  573         to be one of the known domains.  It just has to have the same 
  574         nest:parent ration (WRFDomain.moad_ratio) as one. 
  575         @param dom The hwrf.wrf.WRFDomain of interest.""" 
  576         ratio=dom.moad_ratio()
 
  580         """!Copies inputs, links fix files, runs geogrid and delivers 
  586                 logger.info(
'Geogrid running in directory: '+os.getcwd())
 
  587                 assert(
not re.match(
'\A/tmp',os.getcwd()))
 
  589                 for f 
in glob.glob(
'geo*'):
 
  592                     except(EnvironmentError) 
as e:
 
  593                         logger.warning(
'%s: did not remove file, but ' 
  594                                        'continuing anyway'%(f,))
 
  595                 self.
link_fix(geog_data=
True,table=
'GEOGRID.TBL')
 
  597                 with open(
'namelist.wps', 
'w') 
as f:
 
  600                 prog = self.
getexe(
'hwrf_geogrid')
 
  605                 logger.info(
'%s command: %s'%(self.
taskname, repr(cmd),))
 
  607                 findme=
"Successful completion" 
  609                 for glog 
in ( 
'geogrid.log', 
'geogrid.log.0000', 
 
  610                               'geogrid.log.00000' ):
 
  611                     if os.path.exists(glog):
 
  614                         logger.info(
'%s: does not exist.'%(glog,))
 
  615                 if geogrid_log 
is None:
 
  616                     msg=
'WPS Geogrid failed: could not find geogrid log file.' 
  619                     raise GeogridNoLog(msg)
 
  620                 logger.info(
'%s: will check for %s'%(geogrid_log,findme))
 
  621                 if not check_last_lines(geogrid_log,findme):
 
  622                     raise WPSError(
'%s: did not find "%s"' 
  623                                    %(geogrid_log,findme))
 
  626         except WPSError 
as we:
 
  629         except Exception 
as e:
 
  630             logger.critical(
'WPS Geogrid failed: '+str(e),exc_info=
True)
 
  634         self.
postmsg(
'WPS Geogrid completed.')
 
  638     """!This is a HWRF task that extracts the meteorological fields 
  639     from GRIB formatted files and write the fields to intermediate 
  640     files.  It is a wrapper around the WPS ungrib.exe program.""" 
  643         """!Creates a new Ungrib.  All arguments are passed to the 
  644         WRFTask constructor.  The only new arguments are: 
  646         * in_ftime - Optional: the parent model forecast hour to use 
  647             for analysis time data in this Ungrib. 
  649         * in_item, in_dataset - Optional: the item and dataset, in 
  650             hwrf.input terminology, to use to request the GRIB1/2 
  653         * in_item2, in_dataset2 - Optional: same as in_item and 
  654             in_dataset, but for a second GRIB file that is appended to 
  657         * inputs - Optional: an hwrf.input.DataCatalog to provide 
  658             input file locations.  Default: construct one using the 
  659             catalog name from the "catalog" option in this objects 
  662         @param args,kwargs passed to WPSTask.__init__        """ 
  663         super(Ungrib, self).
__init__(*args,**kwargs)
 
  664         if 'inputs' in kwargs 
and kwargs[
'inputs']:
 
  667             hd=self.
confstr(
'catalog',
'hwrfdata')
 
  669                 self.
conf,hd,self.sim.simstart())
 
  670         self.
__one_time=bool(kwargs.get(
'one_time',
False))
 
  673         in_atime=kwargs[
'in_atime'] if(
'in_atime' in kwargs) 
else \
 
  675         if in_atime 
is None or in_atime==
'':
 
  678             self.
in_atime=to_datetime_rel(in_atime,self.sim.simstart())
 
  681         in_ftime=kwargs[
'in_ftime'] if(
'in_ftime' in kwargs) 
else None 
  682         if in_ftime 
is None: in_ftime=self.
confint(
'ftime',0)
 
  683         if isinstance(in_ftime,datetime.datetime):
 
  684             in_ftime=in_ftime-in_atime
 
  685         if not isinstance(in_ftime,int) 
and not isinstance(in_ftime,float) \
 
  686                 and not isinstance(in_ftime,datetime.timedelta):
 
  687             raise TypeError(
'in_ftime must be an int, a float or a ' 
  688                             'timedelta, not a %s (in_ftime=%s)'%(
 
  689                             in_ftime.__class__.__name__,repr(in_ftime)))
 
  693                 'in_dataset',self.
confstr(
'dataset',
'hwrfdata')))
 
  697                 'in_item',self.
confstr(
'item',
'gfs_pgrb')))
 
  699                 'in_item2',self.
confstr(
'item2',
'')))
 
  700         self.
log().debug(
'self.in_item=%s dataset=%s section=%s'%(
 
  736         """!If True, we are pretending that hour 0 is valid for all 
  737         times.  This is equivalent to constant boundary conditions. 
  738         If in_ftime is non-zero, then that hour is used instead.""" 
  743         """!How many grib files are processed for each time?  This is 2 
  744         if an item2 or in_item2 were given in the config or 
  745         constructor, respectively.  Otherwise it is 1.""" 
  751         """!Iterates over all input files needed.  This is meant to be 
  752         plugged in to an hwrf.input.InputSource to obtain input data 
  753         in the scripts.exhwrf_input job.""" 
  755         for t 
in self.
times():
 
  758             ftime=to_datetime_rel(dt,to_datetime_rel(self.
in_ftime,
 
  765                        optional=self.
confbool(
'item2_optional',
False))
 
  768         """!Finds input data for a specific time and GRIB file 
  769         @param dt the forecast time as a datetime.datetime, relative  
  771         @param igrib 1 or 2, which input file is used (in_item or in_item2 
  772           This is needed for models like GFS and GEFS that split their 
  773           GRIB files into two parts. 
  774         @param require if True, then hwrf.exceptions.UngribNoInput is 
  775           raised when inputs cannot be found.""" 
  780         ftime=to_datetime_rel(dt,to_datetime_rel(self.
in_ftime,
 
  782         stratime=self.in_atime.strftime(
"%Y%m%d%H")
 
  783         logger.info(
"Check for dataset=%s item=%s ftime=%s atime=%s in %s"%(
 
  784                 dataset, item, ftime.strftime(
"%Y%m%d%H"), stratime, 
 
  786         logger.debug(
'inputs: '+repr(self.
inputs))
 
  795         self.
log().info(
"Got back: "+repr(ret))
 
  796         if require 
and (ret 
is None or ret==
''):
 
  798                 "Cannot find input for: dataset=%s item=%s ftime=%s igrib=%d" 
  799                 "atime=%s in %s"%(dataset, item, ftime.strftime(
"%Y%m%d%H"),
 
  800                                   stratime, repr(self.
inputs),igrib ))
 
  803         """!Link or copies all the input GRIB files to the current 
  804         working directory.  Note that if two grib files are requested, 
  805         then this is done by copying the data. 
  806         @param require if True, then hwrf.exceptions.UngribNoInput is 
  807          raised when inputs cannot be found.""" 
  813         for t 
in self.
times():
 
  815             fhr=int(math.ceil(to_fraction(dt)/3600)) 
 
  816             logger.info(
'Need to get input for t-start=%s-%s=%s=hour %s ' 
  818                     repr(t),repr(start),repr(dt),repr(fhr),repr(igribmax)))
 
  819             opt2=self.
confbool(
'item2_optional',
False)
 
  820             for igrib 
in xrange(igribmax):
 
  823                 if igrib==1 
and opt2:
 
  825                 if f 
is None or f==
'':
 
  827                         "Cannot find input for hour %d"%(fhr,))
 
  830                 logger.info(
'Input for hour %s is %s'%(repr(fhr),repr(f)))
 
  833             if not wait_for_files(
 
  835                 maxwait=self.
confint(
'max_grib_wait',1800),
 
  836                 sleeptime=self.
confint(
'grib_sleep_time',20),
 
  837                 min_size=self.
confint(
'min_grib_size',1),
 
  838                 min_mtime_age=self.
confint(
'min_grib_age',30),
 
  843                 logger.error(
'Some input GRIB files do not exist.  Giving ' 
  846                     'Some GRIB files are missing.  See the log for details.')
 
  850             logger.info(
'Merging GRIB files.')
 
  851             for t 
in self.
times():
 
  853                 fhr=int(math.ceil(to_fraction(dt)/3600)) 
 
  860                 logger.info(
'%s: merge GRIBs for time %s here' 
  861                             %(repr(out),repr(fhr)))
 
  862                 with open(out,
'wb') 
as outf:
 
  863                     logger.info(
'%s: copy from %s'%(repr(out),repr(in1)))
 
  864                     with open(in1,
'rb') 
as inf1:
 
  865                         shutil.copyfileobj(inf1,outf)
 
  867                         logger.info(
'%s: copy from %s'%(repr(out),repr(in2)))
 
  868                         with open(in2,
'rb') 
as inf2: 
 
  869                             shutil.copyfileobj(inf2,outf)
 
  870                     except EnvironmentError 
as e:
 
  871                         opt=self.
confbool(
'item2_optional',
False)
 
  873                             logger.warning(
'%s: ignoring exception'%(
 
  874                                     str(in2),),exc_info=
True)
 
  877                 logger.info(
'%s: done'%(repr(out),))
 
  879             logger.info(
'Not subsetting or merging GRIB files')
 
  881             for t 
in self.
times():
 
  886                 make_symlink(in1,out,force=
True,logger=self.
log())
 
  888         subset0=self.
confstr(
'subset',
'')
 
  889         subset1file=self.
confstr(
'subset_grib1',subset0)
 
  890         subset2file=self.
confstr(
'subset_grib2',subset0)
 
  891         if subset1file 
or subset2file:
 
  892             logger.info(
'Subsetting GRIB files')
 
  893             cmd2=alias(bigexe(self.
getexe(
'wgrib2',
'wgrib2')))
 
  894             cmd1=alias(bigexe(self.
getexe(
'wgrib')))
 
  899                 with open(subset1file,
'rt') 
as f:
 
  901                         if line: subset1.append(line.rstrip())
 
  902                 subset1_re=re.compile(
'|'.join(subset1))
 
  907                 with open(subset1file,
'rt') 
as f:
 
  909                         if line: subset2.append(line.rstrip())
 
  910                 subset2_re=re.compile(
'|'.join(subset2))
 
  914             for t 
in self.
times():
 
  916                 tgtfile=os.path.basename(srcfile)+
".subsetted" 
  929                     raise UngribInputUnknown(
 
  930                         "%s: is neither GRIB1 nor GRIB2."%(srcfile,))
 
  932                 if cmd 
is not None and subset_re 
is not None:
 
  933                     logger.info(
"%s: subsetting from %s"%(tgtfile,srcfile))
 
  935                     logger.info(
'%s: delete and replace with subset %s'%(srcfile,tgtfile))
 
  937                     os.rename(tgtfile,srcfile)
 
  939                     logger.info(
"%s: not subsetting"%(srcfile))
 
  941     def _subset_grib(self, srcfile,tgtfile,cmd,matcher):
 
  942         """!Runs wgrib on a GRIB1 input file to get its content 
  943         listing.  Then runs wgrib again to subset it. 
  944         * srcfile - the input file, to scan and subset 
  945         * tgtfile - the location for the new, subset file 
  946         * cmd - a produtil.prog.ImmutableRunner for wgrib 
  947         * matcher - a regular expression, or anything else that has 
  948             a .search method.  Each call to search takes one line and 
  949             returns True if it should be included in the subset, or 
  953         for line 
in runstr(cmd[srcfile],logger=self.
log()).splitlines(
True):
 
  954             if matcher.search(line):
 
  962         self.
log().info(
'%s => %s: keeping %d records, discarding %d'%(
 
  963                 srcfile,tgtfile,k,d))
 
  965         runme=cmd[
'-i',
'-grib',
'-o',tgtfile,srcfile] << subset
 
  966         self.
log().info(
'run: %s'%(repr(runme),))
 
  967         checkrun(runme,logger=self.
log())
 
  972     def _rename_grib(self, filename=None):
 
  973         """!Generates a GRIB filename using the input name expected by 
  974         WPS: GRIBFILE.AAA for the first, GRIBFILE.AAB for the second, 
  975         and so on.  An internal counter self._n_gribfiles keeps track 
  976         of the number of files requested so far.  The optional 
  977         filename argument is ignored.   
  978         @param filename Ignored. 
  979         @returns the new filename chosen.""" 
  980         sufs = [a+b+c 
for a,b,c 
in itertools.product(
 
  981                 string.ascii_uppercase, repeat = 3)]
 
  987         """!Links inputs and runs ungrib.  Ungrib has no deliverables: 
  988         it only places files in the local directory for a later 
  989         Metgrid.run() to pick up and use.""" 
  994                 logger.info(
'Ungrib starting in %s'%(os.getcwd(),))
 
  996                 assert(
not re.match(
'\A/tmp',os.getcwd()))
 
  998                 with open(
'namelist.wps', 
'w') 
as f:
 
 1004                 prog = self.
getexe(
'hwrf_ungrib')
 
 1008                 logger.info(
'%s command: %s'%(self.
taskname, repr(cmd),))
 
 1012         except Exception 
as e:
 
 1013             logger.critical(
'WPS Ungrib failed: '+str(e),exc_info=
True)
 
 1016         self.
state=COMPLETED
 
 1017         self.
postmsg(
'WPS Ungrib completed')
 
 1020         """!Checks the expected ungrib output files to make sure they 
 1021         all exist and are non-empty.""" 
 1024         for i 
in self.
times():
 
 1025             f = 
"%s/FILE:%s" %(self.
location, i.strftime(
"%Y-%m-%d_%H"))
 
 1027                 logger.info(
'%s: exists, is non-empty.'%(f,))
 
 1029                 logger.warning(
'%s: does not exist or is empty'%(f,))
 
 1032             logger.error(
'WPS Ungrib failed: some output files did not ' 
 1033                          'exist or were empty.  See stdout/stderr log for ' 
 1036                 "WPS Ungrib output file %s does not exist or is empty" %f)
 
 1038         """!Does nothing.  Ungrib has no products to deliver. 
 1039         @param args,kwargs Ignored.""" 
 1041         """!Ungrib delivers no products.  Everything is kept in the WPS 
 1042         temporary directory and reused by metgrid.  The Metgrid 
 1043         implementation assumes it is running in the same directory as 
 1044         Ungrib, so no call to products() is needed. 
 1045         @param kwargs Ignored.""" 
 1050     """!This is a HWRF task that horizontally interpolates the 
 1051     meteorological fields extracted by ungrib to the model grids 
 1052     defined by geogrid.  It is a wrapper around the WPS metgrid.exe 
 1056         """!Creates a new Metgrid.   
 1057         @param args,kwargs All arguments are passed to the 
 1058           WPSTask.__init__() constructor.""" 
 1059         super(Metgrid, self).
__init__(*args,**kwargs)
 
 1062         """!Copies inputs, runs metgrid, delivers outputs.   
 1064         @note Ungrib must have been run in the same directory 
 1065         beforehand.  The Geogrid must have been run in the same 
 1066         directory, unless it was passed as input via geogrid_from= in 
 1072                 logger.info(
'WPS Metgrid running in dir: '+os.getcwd())
 
 1073                 assert(
not re.match(
'\A/tmp',os.getcwd()))
 
 1075                 with open(
'namelist.wps', 
'w') 
as f:
 
 1082                 prog = self.
getexe(
'hwrf_metgrid')
 
 1087                 logger.info(
'%s command: %s'%(self.
taskname, repr(cmd),))
 
 1089                 for time 
in self.
times():
 
 1090                     logger.info(
'%s: deliver products for this time.' 
 1091                                 %(time.strftime(
'%Y-%m-%d %H:%M:%S')))
 
 1097                         make_symlink(p.location,pb.location,force=
True,
 
 1101         except Exception 
as e:
 
 1102             logger.critical(
'WPS Metgrid failed: %s'%(str(e),), 
 
 1106         self.
state=COMPLETED
 
 1107         self.
postmsg(
'WPS Metgrid completed')
 
 1109         """!Returns a FileProduct for the Metgrid file at the specified 
 1110         time or None if no such product is found 
 1111         @param when the time of interest""" 
 1117         """!Generates FileProduct objects for this Metgrid.  This is 
 1118         called automatically from the constructor.""" 
 1122         format = (
"%Y-%m-%d_%H_%M_%S" if self.
nocolons else \
 
 1123                       "%Y-%m-%d_%H:%M:%S")
 
 1124         for time 
in self.
times():
 
 1125             f=
"met_nmm.d01.%s.%s"%(time.strftime(format),suffix)
 
 1126             loc=os.path.join(self.
outdir,f)
 
 1133             fb=
"met_nmm.d01.%s.%s"%(timeb.strftime(format),suffix)
 
 1134             locb=os.path.join(self.
outdir,fb)
 
Change directory, handle temporary directories. 
 
This module provides a set of utility functions to do filesystem operations. 
 
def mpirun(arg, kwargs)
Converts an MPI program specification into a runnable shell program suitable for run(), runstr() or checkrun(). 
 
in_item2
Input item for the second GRIB source. 
 
def getsim(self)
The WRFSimulation object for which we are preparing input. 
 
Generates a Fortran namelist entirely from config files. 
 
def get_grib
Link or copies all the input GRIB files to the current working directory. 
 
def to_timedelta
Converts an object to a datetime.timedelta. 
 
def link_geogrid(self)
Links geogrid output from the specified Geogrid task to the current working directory. 
 
parent_atime
Parent model analysis time. 
 
def input_at
Finds input data for a specific time and GRIB file. 
 
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section. 
 
def gribs_per_time(self)
How many grib files are processed for each time? This is 2 if an item2 or in_item2 were given in the ...
 
Raised when wgrib or wgrib2 generates an empty or invalid file. 
 
def redirect(self)
Should subprograms' outputs be redirected to separate files? 
 
MOAD
The Mother Of All Domains (MOAD) as an hwrf.wrf.WRFDomain. 
 
Handles file locking using Python "with" blocks. 
 
taskname
Read-only property: the name of this task. 
 
def _wps_namelist(self)
Fills the self.nl namelist with correct information. 
 
A subclass of Product that represents file delivery. 
 
The base class of tasks run by the HWRF system. 
 
def remove_file
Deletes the specified file. 
 
def io_suffix(self, stream)
Guesses the file suffix (.nc, etc.) for the specified stream. 
 
Base class of WPS-related exceptions. 
 
This is a HWRF task that horizontally interpolates the meteorological fields extracted by ungrib to t...
 
conf
This HWRFTask's hwrf.config.HWRFConfig object. 
 
starttime
Simulation start time. 
 
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
 
in_item
Input item for the first GRIB source. 
 
def _rename_grib
Generates a GRIB filename using the input name expected by WPS: GRIBFILE.AAA for the first...
 
endtime
Simulation end time. 
 
in_dataset2
Input dataset for the second GRIB source. 
 
def check_outfiles(self)
Checks the expected ungrib output files to make sure they all exist and are non-empty. 
 
in_dataset
Input dataset for the first GRIB source. 
 
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
 
def within_dt_epsilon(time1, time2, epsilon)
Returns True if time1 is within epsilon of time2, and False otherwise. 
 
def run(self)
Links inputs and runs ungrib. 
 
def __init__(self, args, kwargs)
Creates a new Metgrid. 
 
def confbool
Alias for self.conf.getbool for section self.section. 
 
section
The confsection in self.section for this HWRFTask (read-only) 
 
def gribver(filename)
What is the GRIB version of this file? 
 
Base class of tasks run by HWRF. 
 
A shell-like syntax for running serial, MPI and OpenMP programs. 
 
def run(self)
Copies inputs, links fix files, runs geogrid and delivers results. 
 
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section. 
 
def make_namelist(self)
Returns the namelist.wps contents as a string. 
 
outdir
The directory in which this task should deliver its final output. 
 
nocolons
Read-only property that guesses whether colons should be omitted from filenames (True) or not (False)...
 
def met_at_time(self, when)
Returns a FileProduct for the Metgrid file at the specified time or None if no such product is found...
 
nl
an hwrf.namelist.Conf2Namelist with the namelist.wps information 
 
Raised when a geogrid output file is missing or empty. 
 
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
 
def products(self, kwargs)
Ungrib delivers no products. 
 
in_ftime
Parent model forecast hour that maps to the analysis time of this model. 
 
def deliver_products
This is called from self.run to deliver all products to the intercom directory. 
 
Stores products and tasks in an sqlite3 database file. 
 
This subclass of TempDir takes a directory name, instead of generating one automatically. 
 
def makedirs
Make a directory tree, working around filesystem bugs. 
 
Time manipulation and other numerical routines. 
 
def make_product_structure(self, stream)
Generates the self._products data structure used by the products method. 
 
maxdom
The number of WRF domains. 
 
workdir
The directory in which this task should be run. 
 
def confint
Alias for self.conf.getint for section self.section. 
 
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
 
def conffloat
Alias for self.conf.getfloat for section self.section. 
 
def scrub(self)
Should temporary files be deleted as soon as they are not needed? 
 
def log
Obtain a logging domain. 
 
def _subset_grib(self, srcfile, tgtfile, cmd, matcher)
Runs wgrib on a GRIB1 input file to get its content listing. 
 
def inputiter(self)
Iterates over all input files needed. 
 
def make_products(self)
Generates FileProduct objects for this Metgrid. 
 
def deliver_products(self, args, kwargs)
Does nothing. 
 
A time-indexed array that can only handle equally spaced times. 
 
This subclass of HWRFTask represents a WPS Task. 
 
increment
Simulation boundary input interval. 
 
Sorts a pre-determined list of objects, placing unknown items at a specified location. 
 
def __init__(self, args, kwargs)
Creates a new Geogrid. 
 
This is a HWRF task that extracts the meteorological fields from GRIB formatted files and write the f...
 
def __init__(self, args, kwargs)
Creates a new Ungrib. 
 
This is a HWRF task that pre processes the geogrid to define the model domains and interpolates stati...
 
def undeliver_products
Deletes all delivered products and marks them as unavailable. 
 
def products
Iterates over all products. 
 
def domains(self)
Iterates over the domains in this WPSTask. 
 
def run(self)
Copies inputs, runs metgrid, delivers outputs. 
 
Exceptions raised by the hwrf package. 
 
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
 
def confstr
Alias for self.conf.getstr for section self.section. 
 
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info() 
 
def __init__(self, dstore, conf, section, sim, domains, taskname=None, wpsdir=None, starttime=None, increment=None, endtime=None, parent_atime=None, geogrid_from=None, kwargs)
Create a new WPSTask. 
 
def times(self)
Iterates over all output times. 
 
def check_geogrid(self)
Checks to see if the geogrid MOAD output file is present and non-empty in the current working directo...
 
def geodat(self, dom)
Returns the FileProduct for the geogrid data for the specified nesting ratio. 
 
def mpi(arg, kwargs)
Returns an MPIRank object that represents the specified MPI executable. 
 
def taskvars(self)
The dict of object-local values used for string substitution. 
 
def realtime(self)
Is this job a real-time forecast job? 
 
inputs
the hwrf.input.DataCatalog to use for obtaining input data 
 
in_atime
Parent model analysis time. 
 
def make_products(self)
This subroutine should be implemented in subclasses. 
 
def link_fix
Links all fix files for ungrib to the current working directory. 
 
def one_time(self)
If True, we are pretending that hour 0 is valid for all times. 
 
def make_products(self)
Creates the FileProduct objects for this Geogrid. 
 
def make_symlink
Creates a symbolic link "target" that points to "source".