1 """!This module contains Tasks to run the WRF Preprocessing System
2 (WPS): Geogrid, Ungrib and Metgrid."""
8 import os, shutil, collections, glob, time, math, re, itertools
9 import string, urlparse, datetime, collections
19 from hwrf.exceptions import WPSError, UngribSubsetError, GeogridNoOutput, \
20 TimestepTooShort, GeogridNoLog, UngribNoInput, UngribInputUnknown
21 from hwrf.numerics import partial_ordering, TimeArray, to_timedelta, \
22 to_datetime_rel, to_fraction, to_datetime
27 from collections
import defaultdict
70 'opt_geogrid_tbl_path',
81 'opt_metgrid_tbl_path',
91 """!This subclass of HWRFTask represents a WPS Task. Multiple WPS
92 jobs run in the same work directory. This class allows the jobs
93 to be represented together as a set, with one work directory
94 specified at the top level. This class exists only to reduce code
95 duplication by letting Metgrid, Geogrid and Ungrib share a
97 def __init__(self, dstore, conf, section, sim, domains, taskname=None,
98 wpsdir=
None, starttime=
None, increment=
None, endtime=
None,
99 parent_atime=
None, geogrid_from=
None, **kwargs):
100 """!Create a new WPSTask.
102 @param dstore The produtil.datastore.Datastore for database information
103 @param conf the hwrf.config.HWRFConfig for configuration info
104 @param section the section to use in conf
105 @param sim The "sim" is the
106 WRFSimulation for which we are preparing input.
107 @param domains The domains must be a list of WRFDomain
109 @param taskname Optional: the name of this Task. Default: config section name.
110 @param wpsdir Optional: the directory in which to run WPS.
111 Default: taskname subdirectory of conf.getdir("WORKhwrf")
113 @param starttime,endtime Optional: simulation length. Default: same as sim.
114 @param increment Optional: boundary input interval. Default: get from sim.
115 @param parent_atime Optional: analysis time of parent model. Default: starttime
116 @param geogrid_from Optional: a Geogrid object whose output
117 should be linked before running this WPS step. This is
118 used when the WPS Ungrib and Metgrid are run more than
119 once on the same domain. For example, one may run
120 multiple analysis times or multiple forecast lengths off
121 of the same geogrid output.
122 @param kwargs Other options are sent to the hwrf.hwrftask.HWRFTask.__init__ constructor.
124 if taskname
is None: taskname=section
125 super(WPSTask,self).
__init__(dstore,conf,section,taskname,**kwargs)
129 self.
_domains=[sim[domain]
for domain
in domains]
132 if geogrid_from
is not None:
133 if not isinstance(geogrid_from,Geogrid):
135 'The geogrid_from parameter to WPSTask.__init__ must '
136 'be a Geogrid. You provided a %s %s.'
137 %(type(geogrid_from).__name__,repr(geogrid_from)))
140 if starttime
is None: starttime=sim.simstart()
142 if endtime
is None: endtime=sim.simend()
143 self.
endtime=to_datetime_rel(endtime,starttime)
144 self.
increment=increment
if increment
is not None else 6*3600
146 if parent_atime
is None:
147 parent_atime=starttime
150 raise TimestepTooShort(
151 'Geogrid timestep %s is smaller than 300 seconds.'
153 self.
log().debug(
'%s times: start=%s end=%s increment=%s'%(
174 def _guess_nocolons(self):
175 """!Guesses whether we should use colons in filenames. This
176 information is obtained from sim.get_nocolons(). Note that
177 this requires re-scanning all input and output io_forms, so
178 this function is moderately expensive."""
179 nc=self._sim.get_nocolons()
185 nocolons=property(_guess_nocolons,
None,
None, \
186 """A property that is True if WPS will omit colons from the
187 output filenames, and False otherwise.""")
190 """!Guesses the file suffix (.nc, etc.) for the specified stream.
192 Tries to guess what io suffix WPS will use for a stream. If
193 the stream is not one of the known streams, this function logs
194 a warning and tries to continue, but will likely fail.
196 @param stream the stream of interest.
198 if(stream==
'metgrid' or stream==
'auxinput1'):
199 return self._sim.get_io_suffix(
'auxinput1')
200 elif(stream==
'geogrid' or stream==
'auxinput2'):
201 return self._sim.get_io_suffix(
'auxinput2')
204 'caller requested unknown stream "%s" in WPSTask.io_suffix'
206 return self._sim.get_io_suffix(
'input')
209 """!The number of WRF domains, and highest domain number."""
210 return self._sim.maxdom()
214 maxdom=property(_maxdom,
None,
None,
"The number of WRF domains.")
216 """!The mother of all domains as a WRFDomain."""
217 return self._sim.get(1)
221 MOAD=property(_MOAD,
None,
None,
"Returns the Mother of All Domains (MOAD)")
224 """!The WRFSimulation object for which we are preparing input."""
228 """!Checks to see if the geogrid MOAD output file is present
229 and non-empty in the current working directory. Raises
230 GeogridNoOutput if the output is missing or empty."""
232 filename=
'geo_nmm.d01.%s'%(suffix,)
238 """!Links geogrid output from the specified Geogrid task to the
239 current working directory. Will raise an exception if Geogrid
240 has not produced its outputs yet. This is used when multiple
241 Metgrid executions are used with the same Geogrid. The
242 geogrid only needs to be run once since its outputs do not
243 change with time, just with domain configuration.
245 Specifically, this finds all Product objects in task
246 self._geogrid_from whose prodname contains "geo" and links
247 from the product's location to its prodname in the current
248 working directory. This logic must match the logic in
249 Geogrid.make_products and WPSTask.check_geogrid."""
254 logger.info(
'Not linking geogrid inputs. I hope you already '
255 'ran geogrid in this directory.')
258 logger.info(
'Task %s linking geo products from %s task %s'
259 %(self.
taskname,type(fromtask).__name__,
262 for product
in fromtask.products():
264 name=product.prodname
265 if name.find(
'geo')>=0:
266 logger.info(did+
': should copy to '+name)
268 logger.info(did+
': does not contain "geo" so skipping this.')
276 msg=did+
': product is not available'
277 elif loc
is None or loc==
'':
278 msg=did+
': product has no location (but is available)'
279 elif not os.path.exists(loc):
280 msg=did+
': file does not exist: '+repr(loc)
282 msg=did+
': file is empty: '+repr(loc)
288 loc,name,force=
True,logger=logger)
290 """!Iterates over all output times."""
300 sim=property(getsim,
None,
None,
301 """Returns the WRF simulation for this WPSTask""")
303 """!Iterates over the domains in this WPSTask"""
304 for domain
in self.
_domains:
yield domain
306 """!Links all fix files for ungrib to the current working directory.
307 @param geog_data if True, also link the geog-data directory
308 @param table name of the table file symbolic link"""
314 logger.info(
'Link table file %s to %s'%(tblhere,tbl))
315 make_symlink(tbl,tblhere,force=
True,logger=logger)
317 make_symlink(self.
getdir(
'geog_data'),
'geog-data',force=
True,
319 except Exception
as e:
321 logger.warning(
'cannot link to WPS fix files: '+str(e))
324 """!Generates the self._products data structure used by the
325 products method. Should be called by make_products in
336 """!This subroutine should be implemented in subclasses. It
337 should call self.make_product_structure(), and then add
340 self._products[domain.moad_ratio()][time]=product
342 where the domain is a WRFDomainBase subclass, the time is
343 anything accepted by to_datetime_rel's first argument, and the
344 product is any subclass of Product."""
346 """!Iterates over all products
347 @param domain the hwrf.wrf.WRFDomain
349 @note One, both or neither argument can be specified. All matching
350 products are yielded."""
351 if self._products
is not None:
352 if domain
is not None and domain.moad_ratio()
in self._products:
353 tprod=self._products[domain.moad_ratio()]
355 for (time,product)
in tprod.itervalues():
357 assert(localprod
is not None)
360 localprod=tprod[time]
361 assert(localprod
is not None)
366 for tprod
in self._products.itervalues():
367 for thetime,product
in tprod.iteritems():
370 thetime,time,epsilon):
374 assert(product
is not None)
378 """!Returns the namelist.wps contents as a string."""
379 return self.nl.remove_traits().set_sorters(_wps_namelist_order,
382 """!Deletes all delivered products and marks them as
384 @param time,domain passed to products() to get the list of products
385 @param fromloc Ignored."""
388 prodlist=[p
for p
in self.
products(time=time,domain=domain)]
390 p.undeliver(logger=logger)
392 keep=
True,relink=
False):
393 """!This is called from self.run to deliver all products to the
396 @param time,domain The optional time and domain arguments
397 are sent to self.products.
398 @param fromloc By default, this routine assumes that the file
399 to be delivered is in the current working directory with the
400 same name as the destination file. To change that, give a
401 lambda function in "fromloc", which converts the destination
402 filename (the only argument) to a local filename.
403 @param keep The "keep" argument has the same meaning as in
404 deliver_file: if False, the file may be moved to the
406 @param relink If True, and the file is moved, then a symlink
407 will be made from the original file location to the
409 link_files=self.
confbool(
'link_wrf_fix',
True)
410 keep=keep
and link_files
415 if fromloc
is None: fromloc=os.path.basename
416 prodlist=[p
for p
in self.
products(time=time,domain=domain)]
420 if f
is None or len(f)<1:
422 msg=
'%s: product has no location; cannot deliver it'%(
427 assert(isinstance(fl,basestring))
429 p.deliver(frominfo=fl,keep=keep,logger=logger)
430 if not keep
and not os.path.exists(fl):
433 logger.info(
'%s: file was moved to destination %s'
434 %(linkfile,destfile))
436 logger.info(
'%s: relink from delivered file %s'
437 %(linkfile,destfile))
438 make_symlink(destfile,linkfile,force=
True,
441 logger.info(
'%s: not relinking. File now at %s'
442 %(linkfile,destfile))
443 except EnvironmentError
as e:
444 logger.warning(
'%s: cannot deliver file'%(f,),exc_info=
True)
447 logger.critical(
'some output files were empty or missing; '
449 def _wps_namelist(self):
450 """!Fills the self.nl namelist with correct information. This
451 must be called before make_namelist, and hence it is called
452 from the constructor."""
454 siu = self.nl.nl_set_if_unset
455 t = self.nl.trait_get
457 start = self.starttime.strftime(
"%Y-%m-%d_%H:%M:%S")
460 s(
'share',
'max_dom', maxdom)
461 s(
'share',
'start_date', start)
462 s(
'share',
'end_date', end)
463 s(
'share',
'interval_seconds', int(self._conf.get(
'wrf',
'bdystep')))
464 io_form_geogrid=int(self.sim.io_form_for(
'auxinput2'))%100
465 io_form_metgrid=int(self.sim.io_form_for(
'auxinput1'))%100
466 if io_form_metgrid==11: io_form_metgrid=2
467 if io_form_geogrid==11: io_form_geogrid=2
468 s(
'share',
'io_form_geogrid', io_form_geogrid)
469 s(
'metgrid',
'io_form_metgrid', io_form_metgrid)
483 resolution =
'10m' if (maxdom<3)
else '2m'
489 dx.append(d.nl.trait_get(
'dx'))
490 dy.append(d.nl.trait_get(
'dy'))
494 pid.append(d.parent.get_grid_id())
495 pgr.append(d.nl.trait_get(
'parent_grid_ratio'))
496 start=str(d.nl.trait_get(
'start',
'auto')).lower()
498 istart.append(int(d.nl.trait_get(
'istart')))
499 jstart.append(int(d.nl.trait_get(
'jstart')))
504 gid.append(d.get_grid_id())
505 nx.append(d.nl.trait_get(
'nx'))
506 ny.append(d.nl.trait_get(
'ny'))
507 res.append(resolution)
512 s (
'geogrid',
'parent_id', pid)
513 s (
'geogrid',
'parent_grid_ratio', pgr)
514 s (
'geogrid',
'e_we', nx)
515 s (
'geogrid',
'e_sn', ny)
516 s (
'geogrid',
'dx', dx)
517 s (
'geogrid',
'dy', dy)
518 s (
'geogrid',
'i_parent_start', istart)
519 siu(
'geogrid',
'geog_data_res', res)
520 siu(
'geogrid',
'map_proj',
'rotated_ll')
521 s (
'geogrid',
'geog_data_path',
'./geog-data/')
522 s (
'geogrid',
'opt_geogrid_tbl_path',
'./')
523 s (
'geogrid',
'j_parent_start', jstart)
524 s (
'geogrid',
'ref_lat', self.
conffloat(
'domlat',section=
'config'))
525 s (
'geogrid',
'ref_lon', self.
conffloat(
'domlon',section=
'config'))
527 s (
'metgrid',
'opt_metgrid_tbl_path',
'./')
528 s (
'metgrid',
'fg_name',
'FILE')
530 s (
'ungrib',
'prefix',
'FILE')
531 siu(
'ungrib',
'out_format',
'WPS')
533 siu(
'share',
'wrf_core',
'NMM')
537 """!This is a HWRF task that pre processes the geogrid to define the
538 model domains and interpolates static geographical data to the
542 """!Creates a new Geogrid.
543 @param args,kwargs All arguments are passed to the
544 WPSTask.__init__() constructor."""
545 super(Geogrid, self).
__init__(*args,**kwargs)
548 """!Creates the FileProduct objects for this Geogrid.
550 @note Implementation note: If you change the list of products,
551 make sure all geogrid outputs needed as input to ungrib or
552 metgrid have a prodname that is the same as the destination
560 f =
"geo_nmm.d%02d.%s" %(id, suffix)
562 f =
"geo_nmm_nest.l%02d.%s" %(id -1, suffix)
563 dest=os.path.join(self.
outdir,f)
565 meta={
"domain": d.name}, location=dest)
567 self.
log().debug(
'geogrid made product %s with location %s'
568 %(repr(prod.did),repr(prod.location)))
569 self.
_products[d.moad_ratio()][time]=prod
571 """!Returns the FileProduct for the geogrid data for the
572 specified nesting ratio. The specified domain does not have
573 to be one of the known domains. It just has to have the same
574 nest:parent ration (WRFDomain.moad_ratio) as one.
575 @param dom The hwrf.wrf.WRFDomain of interest."""
576 ratio=dom.moad_ratio()
580 """!Copies inputs, links fix files, runs geogrid and delivers
586 logger.info(
'Geogrid running in directory: '+os.getcwd())
587 assert(
not re.match(
'\A/tmp',os.getcwd()))
589 for f
in glob.glob(
'geo*'):
592 except(EnvironmentError)
as e:
593 logger.warning(
'%s: did not remove file, but '
594 'continuing anyway'%(f,))
595 self.
link_fix(geog_data=
True,table=
'GEOGRID.TBL')
597 with open(
'namelist.wps',
'w')
as f:
600 prog = self.
getexe(
'hwrf_geogrid')
605 logger.info(
'%s command: %s'%(self.
taskname, repr(cmd),))
607 findme=
"Successful completion"
609 for glog
in (
'geogrid.log',
'geogrid.log.0000',
610 'geogrid.log.00000' ):
611 if os.path.exists(glog):
614 logger.info(
'%s: does not exist.'%(glog,))
615 if geogrid_log
is None:
616 msg=
'WPS Geogrid failed: could not find geogrid log file.'
619 raise GeogridNoLog(msg)
620 logger.info(
'%s: will check for %s'%(geogrid_log,findme))
621 if not check_last_lines(geogrid_log,findme):
622 raise WPSError(
'%s: did not find "%s"'
623 %(geogrid_log,findme))
626 except WPSError
as we:
629 except Exception
as e:
630 logger.critical(
'WPS Geogrid failed: '+str(e),exc_info=
True)
634 self.
postmsg(
'WPS Geogrid completed.')
638 """!This is a HWRF task that extracts the meteorological fields
639 from GRIB formatted files and write the fields to intermediate
640 files. It is a wrapper around the WPS ungrib.exe program."""
643 """!Creates a new Ungrib. All arguments are passed to the
644 WRFTask constructor. The only new arguments are:
646 * in_ftime - Optional: the parent model forecast hour to use
647 for analysis time data in this Ungrib.
649 * in_item, in_dataset - Optional: the item and dataset, in
650 hwrf.input terminology, to use to request the GRIB1/2
653 * in_item2, in_dataset2 - Optional: same as in_item and
654 in_dataset, but for a second GRIB file that is appended to
657 * inputs - Optional: an hwrf.input.DataCatalog to provide
658 input file locations. Default: construct one using the
659 catalog name from the "catalog" option in this objects
662 @param args,kwargs passed to WPSTask.__init__ """
663 super(Ungrib, self).
__init__(*args,**kwargs)
664 if 'inputs' in kwargs
and kwargs[
'inputs']:
667 hd=self.
confstr(
'catalog',
'hwrfdata')
669 self.
conf,hd,self.sim.simstart())
670 self.
__one_time=bool(kwargs.get(
'one_time',
False))
673 in_atime=kwargs[
'in_atime'] if(
'in_atime' in kwargs)
else \
675 if in_atime
is None or in_atime==
'':
678 self.
in_atime=to_datetime_rel(in_atime,self.sim.simstart())
681 in_ftime=kwargs[
'in_ftime'] if(
'in_ftime' in kwargs)
else None
682 if in_ftime
is None: in_ftime=self.
confint(
'ftime',0)
683 if isinstance(in_ftime,datetime.datetime):
684 in_ftime=in_ftime-in_atime
685 if not isinstance(in_ftime,int)
and not isinstance(in_ftime,float) \
686 and not isinstance(in_ftime,datetime.timedelta):
687 raise TypeError(
'in_ftime must be an int, a float or a '
688 'timedelta, not a %s (in_ftime=%s)'%(
689 in_ftime.__class__.__name__,repr(in_ftime)))
693 'in_dataset',self.
confstr(
'dataset',
'hwrfdata')))
697 'in_item',self.
confstr(
'item',
'gfs_pgrb')))
699 'in_item2',self.
confstr(
'item2',
'')))
700 self.
log().debug(
'self.in_item=%s dataset=%s section=%s'%(
736 """!If True, we are pretending that hour 0 is valid for all
737 times. This is equivalent to constant boundary conditions.
738 If in_ftime is non-zero, then that hour is used instead."""
743 """!How many grib files are processed for each time? This is 2
744 if an item2 or in_item2 were given in the config or
745 constructor, respectively. Otherwise it is 1."""
751 """!Iterates over all input files needed. This is meant to be
752 plugged in to an hwrf.input.InputSource to obtain input data
753 in the scripts.exhwrf_input job."""
755 for t
in self.
times():
758 ftime=to_datetime_rel(dt,to_datetime_rel(self.
in_ftime,
765 optional=self.
confbool(
'item2_optional',
False))
768 """!Finds input data for a specific time and GRIB file
769 @param dt the forecast time as a datetime.datetime, relative
771 @param igrib 1 or 2, which input file is used (in_item or in_item2
772 This is needed for models like GFS and GEFS that split their
773 GRIB files into two parts.
774 @param require if True, then hwrf.exceptions.UngribNoInput is
775 raised when inputs cannot be found."""
780 ftime=to_datetime_rel(dt,to_datetime_rel(self.
in_ftime,
782 stratime=self.in_atime.strftime(
"%Y%m%d%H")
783 logger.info(
"Check for dataset=%s item=%s ftime=%s atime=%s in %s"%(
784 dataset, item, ftime.strftime(
"%Y%m%d%H"), stratime,
786 logger.debug(
'inputs: '+repr(self.
inputs))
795 self.
log().info(
"Got back: "+repr(ret))
796 if require
and (ret
is None or ret==
''):
798 "Cannot find input for: dataset=%s item=%s ftime=%s igrib=%d"
799 "atime=%s in %s"%(dataset, item, ftime.strftime(
"%Y%m%d%H"),
800 stratime, repr(self.
inputs),igrib ))
803 """!Link or copies all the input GRIB files to the current
804 working directory. Note that if two grib files are requested,
805 then this is done by copying the data.
806 @param require if True, then hwrf.exceptions.UngribNoInput is
807 raised when inputs cannot be found."""
813 for t
in self.
times():
815 fhr=int(math.ceil(to_fraction(dt)/3600))
816 logger.info(
'Need to get input for t-start=%s-%s=%s=hour %s '
818 repr(t),repr(start),repr(dt),repr(fhr),repr(igribmax)))
819 opt2=self.
confbool(
'item2_optional',
False)
820 for igrib
in xrange(igribmax):
823 if igrib==1
and opt2:
825 if f
is None or f==
'':
827 "Cannot find input for hour %d"%(fhr,))
830 logger.info(
'Input for hour %s is %s'%(repr(fhr),repr(f)))
833 if not wait_for_files(
835 maxwait=self.
confint(
'max_grib_wait',1800),
836 sleeptime=self.
confint(
'grib_sleep_time',20),
837 min_size=self.
confint(
'min_grib_size',1),
838 min_mtime_age=self.
confint(
'min_grib_age',30),
843 logger.error(
'Some input GRIB files do not exist. Giving '
846 'Some GRIB files are missing. See the log for details.')
850 logger.info(
'Merging GRIB files.')
851 for t
in self.
times():
853 fhr=int(math.ceil(to_fraction(dt)/3600))
860 logger.info(
'%s: merge GRIBs for time %s here'
861 %(repr(out),repr(fhr)))
862 with open(out,
'wb')
as outf:
863 logger.info(
'%s: copy from %s'%(repr(out),repr(in1)))
864 with open(in1,
'rb')
as inf1:
865 shutil.copyfileobj(inf1,outf)
867 logger.info(
'%s: copy from %s'%(repr(out),repr(in2)))
868 with open(in2,
'rb')
as inf2:
869 shutil.copyfileobj(inf2,outf)
870 except EnvironmentError
as e:
871 opt=self.
confbool(
'item2_optional',
False)
873 logger.warning(
'%s: ignoring exception'%(
874 str(in2),),exc_info=
True)
877 logger.info(
'%s: done'%(repr(out),))
879 logger.info(
'Not subsetting or merging GRIB files')
881 for t
in self.
times():
886 make_symlink(in1,out,force=
True,logger=self.
log())
888 subset0=self.
confstr(
'subset',
'')
889 subset1file=self.
confstr(
'subset_grib1',subset0)
890 subset2file=self.
confstr(
'subset_grib2',subset0)
891 if subset1file
or subset2file:
892 logger.info(
'Subsetting GRIB files')
893 cmd2=alias(bigexe(self.
getexe(
'wgrib2',
'wgrib2')))
894 cmd1=alias(bigexe(self.
getexe(
'wgrib')))
899 with open(subset1file,
'rt')
as f:
901 if line: subset1.append(line.rstrip())
902 subset1_re=re.compile(
'|'.join(subset1))
907 with open(subset1file,
'rt')
as f:
909 if line: subset2.append(line.rstrip())
910 subset2_re=re.compile(
'|'.join(subset2))
914 for t
in self.
times():
916 tgtfile=os.path.basename(srcfile)+
".subsetted"
929 raise UngribInputUnknown(
930 "%s: is neither GRIB1 nor GRIB2."%(srcfile,))
932 if cmd
is not None and subset_re
is not None:
933 logger.info(
"%s: subsetting from %s"%(tgtfile,srcfile))
935 logger.info(
'%s: delete and replace with subset %s'%(srcfile,tgtfile))
937 os.rename(tgtfile,srcfile)
939 logger.info(
"%s: not subsetting"%(srcfile))
941 def _subset_grib(self, srcfile,tgtfile,cmd,matcher):
942 """!Runs wgrib on a GRIB1 input file to get its content
943 listing. Then runs wgrib again to subset it.
944 * srcfile - the input file, to scan and subset
945 * tgtfile - the location for the new, subset file
946 * cmd - a produtil.prog.ImmutableRunner for wgrib
947 * matcher - a regular expression, or anything else that has
948 a .search method. Each call to search takes one line and
949 returns True if it should be included in the subset, or
953 for line
in runstr(cmd[srcfile],logger=self.
log()).splitlines(
True):
954 if matcher.search(line):
962 self.
log().info(
'%s => %s: keeping %d records, discarding %d'%(
963 srcfile,tgtfile,k,d))
965 runme=cmd[
'-i',
'-grib',
'-o',tgtfile,srcfile] << subset
966 self.
log().info(
'run: %s'%(repr(runme),))
967 checkrun(runme,logger=self.
log())
972 def _rename_grib(self, filename=None):
973 """!Generates a GRIB filename using the input name expected by
974 WPS: GRIBFILE.AAA for the first, GRIBFILE.AAB for the second,
975 and so on. An internal counter self._n_gribfiles keeps track
976 of the number of files requested so far. The optional
977 filename argument is ignored.
978 @param filename Ignored.
979 @returns the new filename chosen."""
980 sufs = [a+b+c
for a,b,c
in itertools.product(
981 string.ascii_uppercase, repeat = 3)]
987 """!Links inputs and runs ungrib. Ungrib has no deliverables:
988 it only places files in the local directory for a later
989 Metgrid.run() to pick up and use."""
994 logger.info(
'Ungrib starting in %s'%(os.getcwd(),))
996 assert(
not re.match(
'\A/tmp',os.getcwd()))
998 with open(
'namelist.wps',
'w')
as f:
1004 prog = self.
getexe(
'hwrf_ungrib')
1008 logger.info(
'%s command: %s'%(self.
taskname, repr(cmd),))
1012 except Exception
as e:
1013 logger.critical(
'WPS Ungrib failed: '+str(e),exc_info=
True)
1016 self.
state=COMPLETED
1017 self.
postmsg(
'WPS Ungrib completed')
1020 """!Checks the expected ungrib output files to make sure they
1021 all exist and are non-empty."""
1024 for i
in self.
times():
1025 f =
"%s/FILE:%s" %(self.
location, i.strftime(
"%Y-%m-%d_%H"))
1027 logger.info(
'%s: exists, is non-empty.'%(f,))
1029 logger.warning(
'%s: does not exist or is empty'%(f,))
1032 logger.error(
'WPS Ungrib failed: some output files did not '
1033 'exist or were empty. See stdout/stderr log for '
1036 "WPS Ungrib output file %s does not exist or is empty" %f)
1038 """!Does nothing. Ungrib has no products to deliver.
1039 @param args,kwargs Ignored."""
1041 """!Ungrib delivers no products. Everything is kept in the WPS
1042 temporary directory and reused by metgrid. The Metgrid
1043 implementation assumes it is running in the same directory as
1044 Ungrib, so no call to products() is needed.
1045 @param kwargs Ignored."""
1050 """!This is a HWRF task that horizontally interpolates the
1051 meteorological fields extracted by ungrib to the model grids
1052 defined by geogrid. It is a wrapper around the WPS metgrid.exe
1056 """!Creates a new Metgrid.
1057 @param args,kwargs All arguments are passed to the
1058 WPSTask.__init__() constructor."""
1059 super(Metgrid, self).
__init__(*args,**kwargs)
1062 """!Copies inputs, runs metgrid, delivers outputs.
1064 @note Ungrib must have been run in the same directory
1065 beforehand. The Geogrid must have been run in the same
1066 directory, unless it was passed as input via geogrid_from= in
1072 logger.info(
'WPS Metgrid running in dir: '+os.getcwd())
1073 assert(
not re.match(
'\A/tmp',os.getcwd()))
1075 with open(
'namelist.wps',
'w')
as f:
1082 prog = self.
getexe(
'hwrf_metgrid')
1087 logger.info(
'%s command: %s'%(self.
taskname, repr(cmd),))
1089 for time
in self.
times():
1090 logger.info(
'%s: deliver products for this time.'
1091 %(time.strftime(
'%Y-%m-%d %H:%M:%S')))
1097 make_symlink(p.location,pb.location,force=
True,
1101 except Exception
as e:
1102 logger.critical(
'WPS Metgrid failed: %s'%(str(e),),
1106 self.
state=COMPLETED
1107 self.
postmsg(
'WPS Metgrid completed')
1109 """!Returns a FileProduct for the Metgrid file at the specified
1110 time or None if no such product is found
1111 @param when the time of interest"""
1117 """!Generates FileProduct objects for this Metgrid. This is
1118 called automatically from the constructor."""
1122 format = (
"%Y-%m-%d_%H_%M_%S" if self.
nocolons else \
1123 "%Y-%m-%d_%H:%M:%S")
1124 for time
in self.
times():
1125 f=
"met_nmm.d01.%s.%s"%(time.strftime(format),suffix)
1126 loc=os.path.join(self.
outdir,f)
1133 fb=
"met_nmm.d01.%s.%s"%(timeb.strftime(format),suffix)
1134 locb=os.path.join(self.
outdir,fb)
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
def mpirun(arg, kwargs)
Converts an MPI program specification into a runnable shell program suitable for run(), runstr() or checkrun().
in_item2
Input item for the second GRIB source.
def getsim(self)
The WRFSimulation object for which we are preparing input.
Generates a Fortran namelist entirely from config files.
def get_grib
Link or copies all the input GRIB files to the current working directory.
def to_timedelta
Converts an object to a datetime.timedelta.
def link_geogrid(self)
Links geogrid output from the specified Geogrid task to the current working directory.
parent_atime
Parent model analysis time.
def input_at
Finds input data for a specific time and GRIB file.
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
def gribs_per_time(self)
How many grib files are processed for each time? This is 2 if an item2 or in_item2 were given in the ...
Raised when wgrib or wgrib2 generates an empty or invalid file.
def redirect(self)
Should subprograms' outputs be redirected to separate files?
MOAD
The Mother Of All Domains (MOAD) as an hwrf.wrf.WRFDomain.
Handles file locking using Python "with" blocks.
taskname
Read-only property: the name of this task.
def _wps_namelist(self)
Fills the self.nl namelist with correct information.
A subclass of Product that represents file delivery.
The base class of tasks run by the HWRF system.
def remove_file
Deletes the specified file.
def io_suffix(self, stream)
Guesses the file suffix (.nc, etc.) for the specified stream.
Base class of WPS-related exceptions.
This is a HWRF task that horizontally interpolates the meteorological fields extracted by ungrib to t...
conf
This HWRFTask's hwrf.config.HWRFConfig object.
starttime
Simulation start time.
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
in_item
Input item for the first GRIB source.
def _rename_grib
Generates a GRIB filename using the input name expected by WPS: GRIBFILE.AAA for the first...
endtime
Simulation end time.
in_dataset2
Input dataset for the second GRIB source.
def check_outfiles(self)
Checks the expected ungrib output files to make sure they all exist and are non-empty.
in_dataset
Input dataset for the first GRIB source.
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
def within_dt_epsilon(time1, time2, epsilon)
Returns True if time1 is within epsilon of time2, and False otherwise.
def run(self)
Links inputs and runs ungrib.
def __init__(self, args, kwargs)
Creates a new Metgrid.
def confbool
Alias for self.conf.getbool for section self.section.
section
The confsection in self.section for this HWRFTask (read-only)
def gribver(filename)
What is the GRIB version of this file?
Base class of tasks run by HWRF.
A shell-like syntax for running serial, MPI and OpenMP programs.
def run(self)
Copies inputs, links fix files, runs geogrid and delivers results.
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
def make_namelist(self)
Returns the namelist.wps contents as a string.
outdir
The directory in which this task should deliver its final output.
nocolons
Read-only property that guesses whether colons should be omitted from filenames (True) or not (False)...
def met_at_time(self, when)
Returns a FileProduct for the Metgrid file at the specified time or None if no such product is found...
nl
an hwrf.namelist.Conf2Namelist with the namelist.wps information
Raised when a geogrid output file is missing or empty.
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
def products(self, kwargs)
Ungrib delivers no products.
in_ftime
Parent model forecast hour that maps to the analysis time of this model.
def deliver_products
This is called from self.run to deliver all products to the intercom directory.
Stores products and tasks in an sqlite3 database file.
This subclass of TempDir takes a directory name, instead of generating one automatically.
def makedirs
Make a directory tree, working around filesystem bugs.
Time manipulation and other numerical routines.
def make_product_structure(self, stream)
Generates the self._products data structure used by the products method.
maxdom
The number of WRF domains.
workdir
The directory in which this task should be run.
def confint
Alias for self.conf.getint for section self.section.
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
def conffloat
Alias for self.conf.getfloat for section self.section.
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
def log
Obtain a logging domain.
def _subset_grib(self, srcfile, tgtfile, cmd, matcher)
Runs wgrib on a GRIB1 input file to get its content listing.
def inputiter(self)
Iterates over all input files needed.
def make_products(self)
Generates FileProduct objects for this Metgrid.
def deliver_products(self, args, kwargs)
Does nothing.
A time-indexed array that can only handle equally spaced times.
This subclass of HWRFTask represents a WPS Task.
increment
Simulation boundary input interval.
Sorts a pre-determined list of objects, placing unknown items at a specified location.
def __init__(self, args, kwargs)
Creates a new Geogrid.
This is a HWRF task that extracts the meteorological fields from GRIB formatted files and write the f...
def __init__(self, args, kwargs)
Creates a new Ungrib.
This is a HWRF task that pre processes the geogrid to define the model domains and interpolates stati...
def undeliver_products
Deletes all delivered products and marks them as unavailable.
def products
Iterates over all products.
def domains(self)
Iterates over the domains in this WPSTask.
def run(self)
Copies inputs, runs metgrid, delivers outputs.
Exceptions raised by the hwrf package.
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
def confstr
Alias for self.conf.getstr for section self.section.
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
def __init__(self, dstore, conf, section, sim, domains, taskname=None, wpsdir=None, starttime=None, increment=None, endtime=None, parent_atime=None, geogrid_from=None, kwargs)
Create a new WPSTask.
def times(self)
Iterates over all output times.
def check_geogrid(self)
Checks to see if the geogrid MOAD output file is present and non-empty in the current working directo...
def geodat(self, dom)
Returns the FileProduct for the geogrid data for the specified nesting ratio.
def mpi(arg, kwargs)
Returns an MPIRank object that represents the specified MPI executable.
def taskvars(self)
The dict of object-local values used for string substitution.
def realtime(self)
Is this job a real-time forecast job?
inputs
the hwrf.input.DataCatalog to use for obtaining input data
in_atime
Parent model analysis time.
def make_products(self)
This subroutine should be implemented in subclasses.
def link_fix
Links all fix files for ungrib to the current working directory.
def one_time(self)
If True, we are pretending that hour 0 is valid for all times.
def make_products(self)
Creates the FileProduct objects for this Geogrid.
def make_symlink
Creates a symbolic link "target" that points to "source".