1 import fractions,math,re,datetime,os
6 Product, FileProduct, UpstreamFile
17 __all__=[
'ForecastWatcher',
'FakeInit',
'RealInit']
20 """Links real_nmm wrfinput_d01 file or returns the FileProduct,
21 regardless of the time and domain, in the current directory for
23 def __init__(self,stormid,src,domain,atime):
24 super(Input2FinalMerge,self).__init__(stormid,src)
27 def get_inputs(self,logger,just_check=False, stormid=None, **kwargs):
28 if stormid
is not None and stormid!=self.
stormid:
29 if logger
is not None:
31 'Wrong stormid requested: %s instead of %s, cannot link.'
34 if self.
src is not None:
38 if not p.available: p.check()
41 just_check=just_check)
51 def __init__(self,ds,conf,fakeid,section,wrf,taskname,**kwargs):
52 workdir=conf.strinterp(
'dir',
'{realstormwork}/{taskname}',
53 realstorm=fakeid,taskname=taskname)
57 with ds.transaction()
as tx:
58 super(ForecastWatcher,self).__init__(
60 taskname=taskname,workdir=workdir,outdir=workdir)
67 """The outer domain value for a multistorm.."""
72 """The inner domain value for a multitorm ."""
75 def change_location(self):
82 if 'location' in self:
83 fakestormid=self.conf.getstr(
'config',
'fakestormid')
84 this_stid=self.conf.getstr(
'config',
'STID')
85 self[
'location']=self[
'location'].replace(this_stid,fakestormid)
89 """!Placeholder to allow a ForecastWatcher to look like an
90 hwrf.fcsttask.WRFAtmos to the scripts.exhwrf_check_init job.
91 This allows the fake storms to run the init checking job."""
96 """Converts a WRFOutput to a Product."""
98 if cached
is not None:
return cached
101 outdir=self[
'outdir']
102 assert(outdir
is not None)
119 priority_stid= self.conf.getstr(
'config',
'multistorm_priority_sid')
120 multistorm_sids=self.conf.getstr(
'config',
'multistorm_sids').split()
121 this_stid=self.storminfo.stormid3
122 this_stid_idx=multistorm_sids.index(this_stid)
126 realstorm=self.conf.getstr(
'config',
'fakestormid'))
127 wrfprod=os.path.basename(wrfout.path())
133 if this_stid != priority_stid
and wrfout.domain().get_grid_id() != 1:
134 if wrfout.domain().get_grid_id() == 2:
135 self.
_d02map=
'd%02d'%(this_stid_idx*2+2)
136 assert(
'd02' in wrfprod)
137 wrfprod=wrfprod.replace(
'd02',self.
_d02map)
138 if wrfout.domain().get_grid_id() == 3:
139 self.
_d03map=
'd%02d'%(this_stid_idx*2+3)
140 assert(
'd03' in wrfprod)
141 wrfprod=wrfprod.replace(
'd03',self.
_d03map)
142 loc=os.path.join(fake_outdir,wrfprod)
144 loc=os.path.join(outdir,os.path.basename(wrfout.path()))
146 with self.dstore.transaction()
as t:
148 prodname=rel,location=loc)
150 uf[
'stream']=wrfout.stream()
152 if relocate: uf.location=loc
157 def run_exe(self,*args,**kwargs):
pass
158 def link_fix(self,*args,**kwargs):
pass
159 def make_namelist(self,*args,**kwargs):
pass
161 def link_all_inputs(self,just_check=False):
163 if 'merge' in self.
inputs:
165 'merge',stormid=self.storminfo.stormid3,
166 just_check=just_check)
168 msg=
'Input type "merge" was not specified.'
169 self.
log().critical(msg)
171 okay=okay
and super(RealInit,self).link_all_inputs(just_check)
174 def add_orig_wrfinput(self,r):
178 """Adds an input source (via self.add_input) that will provide
179 the wrfinput output file from gdas_merge, or whatever the last
180 step in the real storm initialization may have been. The
181 given object must have a get_merge(stormid) function that
182 returns a Product for a given storm"""
184 self.storminfo.stormid3,r,self.sim.get_moad(),self.conf.cycle))
189 def deliver_products(self):
190 c=
'_' if self.sim.nocolons
else ':'
192 atime=
'{year}-{month}-{day}_{HH}'+c+
'00'+c+
'00'
193 met_nmm_time=
'{year}-{month}-{day}_{HH}'+c+
'00'+c+
'00.nc'
194 prefix=
'{com}/{vit[stormid3]}.{YMDH}.multistorm.'
195 stormid=self.storminfo.stormid3
196 files=[
'geo_nmm.d01.nc',
197 'geo_nmm_nest.l01.nc',
198 'geo_nmm_nest.l02.nc',
199 'met_nmm.d01.'+met_nmm_time,
205 'wrfinput_d01_'+stormid+
'gdas_merge.nc']
208 comfile=self.
icstr(prefix+f)
209 localfile=self.
icstr(f)
212 '%s: does not exist or is empty, cannot copy to %s'%(
214 workers.add_work(self.
_copy_one,[localfile,comfile])
216 def _copy_one(self,fromf,tof):
221 def __init__(self,ds,conf,section,wrf,realstorm,stormNinner,stormNouter,**kwargs):
222 super(FakeInit,self).__init__(ds,conf,section,**kwargs)
224 self.
colon=
'_' if wrf.get_nocolons()
else ':'
225 (self.stormNinner,self.stormNouter) = (stormNinner,stormNouter)
227 with ds.transaction()
as tx:
232 """!Returns the hwrf.wrf.WRFSimulation being run."""
237 def define_locations_from_com(self):
238 c=
'_' if self.sim.nocolons
else ':'
239 atime=
'{year}-{month}-{day}_{HH}'+c+
'00'+c+
'00'
240 met_nmm_time=
'{year}-{month}-{day}_{HH}'+c+
'00'+c+
'00.nc'
241 self.
geofile={1:self.
productify(
'{realstormcom}/{realstorm}.{YMDH}.multistorm.geo_nmm.d01.nc',
243 3:self.
productify(
'{realstormcom}/{realstorm}.{YMDH}.multistorm.geo_nmm_nest.l01.nc',
244 'geo_nmm_nest.l01.nc'),
245 9:self.
productify(
'{realstormcom}/{realstorm}.{YMDH}.multistorm.geo_nmm_nest.l02.nc',
246 'geo_nmm_nest.l02.nc') }
248 self.
metfile=self.
productify(
'{realstormcom}/{realstorm}.{YMDH}.multistorm.met_nmm.d01.'+met_nmm_time,
249 'met_nmm.d01.'+met_nmm_time)
250 self.
real_init=self.
productify(
'{realstormcom}/{realstorm}.{YMDH}.multistorm.wrfinput_d01')
251 self.
fort65=self.
productify(
'{realstormcom}/{realstorm}.{YMDH}.multistorm.fort.65')
252 self.
wrfbdy=self.
productify(
'{realstormcom}/{realstorm}.{YMDH}.multistorm.wrfbdy_d01')
256 multistorm_sids = self.conf.getstr(
257 'config',
'multistorm_sids',
'nosids').split()
258 for i1
in xrange(len(multistorm_sids)):
260 stormid=multistorm_sids[i1]
261 self.
wrfanl_for[self.stormNinner[
'storm%sinner'%i]]=\
262 self.
productify(
'{realstormcom}/{realstorm}.{YMDH}.multistorm.wrfanl_d03_'+atime,
263 'storm%sinner_wrfanl'%i,stormid=stormid)
264 self.
wrfanl_for[self.stormNouter[
'storm%souter'%i]]=\
265 self.
productify(
'{realstormcom}/{realstorm}.{YMDH}.multistorm.wrfanl_d02_'+atime,
266 'storm%souter_wrfanl'%i,stormid=stormid)
268 self.
productify(
'{realstormcom}/{realstorm}.{YMDH}.multistorm.wrfinput_d01_'+stormid+
'gdas_merge.nc',
269 'merge_%s'%(stormid,),stormid=stormid)
271 def productify(self,filename,prodname=None,stormid=None):
272 if stormid
is None: stormid=self.
realstorm
274 basename=os.path.basename(loc)
282 uf[
'basename']=basename
286 def get_merge(self,stormid):
294 """Returns the wrfinput file regardless of the time or
298 def wrfanl_at_time(self,atime,domain):
304 """Makes add_wrfinput work with this class."""
308 """Makes add_fort65 work with this class."""
312 """Makes add_wrfbdy work with this class."""
316 """Makes add_metgrid work with this class."""
317 assert(self.
metfile is not None)
318 if ftime==self.conf.cycle:
323 """!Makes add_geogrid work with this class.
324 @param domain the domain of interest
325 @bug this only works for nesting ratios that are a power of 3"""
326 ratio=domain.moad_ratio()
328 if logger
is None: logger=self.
log()
331 except KeyError
as ke:
332 logger.error(
'BLAH BLAH Requesting domain %s at nest level %d which we do not have from geogrid: %s: %s'%
333 (str(domain),ratio,str(self.
geofile),str(ke)),exc_info=
True)
342 def __init__(self,dstore,conf,section,wrf,keeprun=True,
343 wrfdiag_stream=
'auxhist1',**kwargs):
345 super(WRFAtmosMultiStorm,self).__init__(dstore,conf,section,wrf,
346 keeprun=keeprun,wrfdiag_stream=wrfdiag_stream,**kwargs)
349 """Runs set_ij_start (swcorner_dynamic) to generate the i & j
350 start locations for domain 2, then generates the namelist.
352 For the fakestorm of multistorm run, it generates the
353 i & j start locations for each of the outer storms domain
354 then generates the namelist."""
355 if logger
is None: logger=self.
log()
356 domlat=self.conf.getfloat(
'config',
'domlat')
357 domlon=self.conf.getfloat(
'config',
'domlon')
359 s=self.
wrf().swcorner_dynamic_multistorm(
360 self.
getexe(
'swcorner_dynamic'),self.conf.syndat_multistorm,
361 domlat,domlon,logger)
363 s=self.
wrf().swcorner_dynamic(self.
getexe(
'swcorner_dynamic'),
367 with open(filename,
'wt')
as nlin:
371 """This is a Task that copies WRF input and output files from the
372 WRF FAKE Storm run directory to the COM directory,for a Multi Storm Basin Scale."""
373 def __init__(self,dstore,conf,section,wrftask,out_prefix,**kwargs):
375 super(WRFCopyTaskMS,self).__init__(
376 dstore,conf,
'copywrf',wrftask,conf.getstr(
'config',
'out_prefix'))
383 """!Returns the hwrf.wrf.WRFSimulation being run."""
387 """Generates a full path to the delivery location of the
388 specified source file. Returns the full path and the basename
410 if(isinstance(orig,Product)):
411 thisprodname=orig.getprodname()
412 bn=os.path.basename(str(thisprodname))
414 bn=os.path.basename(str(orig))
421 '([0-9][0-9][0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])'
422 '[_.-]([0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])[^/]*$',
423 r'\1-\2-\3_\4:\5:\6',bn)
425 '([0-9][0-9][0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])'
426 '[_.-]([0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])[^/]*$',
427 r'\1-\2-\3_\4:\5:00',bn)
433 inname_colon=bn_colon,
434 inname_colon_s00=bn_colon_s00)
435 return ( os.path.join(self.
outdir,fullbn), fullbn )
437 def _deliver_to_group(self,group,inprod,check=None,destname=None):
438 """Do not call this function directly. It is the internal
439 implementation of d_initial and d_final. Call those functions
441 (comfile,combn)=self.
comfile(inprod,destname=destname)
442 if(isinstance(inprod,Product)):
444 if check
is None: check=
False
459 wrffile=os.path.join(self._wrftask.workdir,inprod)
465 realstorm=self.conf.getstr(
'config',
'fakestormid'))
468 if inprod ==
'track_d03.patcf' and self.wrf_watcher.d03remap:
469 inprod_d03remap= inprod.replace(
'd03',self.wrf_watcher.d03remap)
470 wrffile=os.path.join(fake_outdir,inprod_d03remap)
473 wrffile=os.path.join(fake_outdir,inprod)
476 wrffile=os.path.join(self._wrftask.location,inprod)
486 upstream=UpstreamFile(dstore=self.
dstore,prodname=combn,
487 category=
"%s-upstream"%(self.
taskname,),location=wrffile)
488 upstream.location=wrffile
495 group.append( (upstream,product,bool(check)) )
This module provides a set of utility functions to do filesystem operations.
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
Create namelists, monitor wrf simulations, generate filenames.
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
def _copy_one(self, fromf, tof)
def link_input(self, typenames, just_check=False, kwargs)
link or check for inputs
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
taskname
Read-only property: the name of this task.
The base class of tasks run by the HWRF system.
def check_all_inputs(self)
Placeholder to allow a ForecastWatcher to look like an hwrf.fcsttask.WRFAtmos to the scripts...
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
storminfo
The hwrf.storminfo.StormInfo describing the vitals information for the storm processed by this HWRFTa...
base class of classes that run wrf or real_nmm
def get_wrfinput_gfsinit_realinit(self)
def sim(self)
Returns the hwrf.wrf.WRFSimulation being run.
def wrf_watcher(self)
Returns the hwrf.wrf.WRFSimulation being run.
low-level wrf implementation, underlying hwrf.wrf
def wrfinput_at_time(self, atime, domain)
Base class of tasks run by HWRF.
def wrf(self)
Returns the WRFSimulation object used by this task.
outdir
The directory in which this task should deliver its final output.
out_prefix
Prefix to prepend to output filenames after the com path.
Contains the WorkPool class, which maintains pools of threads that perform small tasks.
def geodat
Makes add_geogrid work with this class.
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Stores products and tasks in an sqlite3 database file.
The uncoupled HWRF forecast Task.
Time manipulation and other numerical routines.
def comfile
get the full path to a file
workdir
The directory in which this task should be run.
def _set_cache(self, wrfout, uf)
Sets the cached produtil.datastore.UpstreamFile for the given wrfout.
This module provides two different ways to generate Fortran namelist files from HWRFConfig sections: ...
A pool of threads that perform some list of tasks.
def log
Obtain a logging domain.
monitors a running wrf simulation
inputs
a mapping of typename to a list of input objects
def _get_cache(self, wrfout)
Returns the product cached for the specified wrfout or None if not found.
def define_locations_from_com(self)
def add_input(self, typename, inobj)
add input of a specified type
Exceptions raised by the hwrf package.
def fort65_at_time(self, atime, domain)
def wrfbdy_at_time(self, atime, domain)
def add_wrfinput(self, r)
def icstr(self, string, section=None, kwargs)
Expands a string in the given conf section.
def met_at_time(self, ftime)