1 """This module will one day contain the implementation of the HyCOM
2 initialization and forecast jobs."""
4 import re, os, glob, datetime
13 from hwrf.numerics import to_datetime,to_datetime_rel,TimeArray
16 return 'YES' if x
else 'NO'
19 """Generates a produtil.prog.Runner for running a Hycom ksh
20 script. Adds a bunch of variables from config sections."""
21 there=task.confstrinterp(path)
24 for k,v
in task.conf.items(task.confstr(
'strings',
'hycomstrings')):
26 for k,v
in task.conf.items(task.confstr(
'bools',
'hycombools')):
28 RTOFSDIR=task.meta(
'RTOFSDIR',
'')
30 vars[
'RTOFSDIR']=RTOFSDIR
33 def read_RUNmodIDout(path):
35 with open(path,
'rt')
as f:
37 m=re.match(
'^export RUNmodIDout=(.*)$',line)
39 RUNmodIDout=m.groups()[0]
43 def remove_ocean(): self.uncouple()
45 def __init__(self,dstore,conf,section,taskname=None,fcstlen=126,
47 super(HYCOMInit,self).__init__(dstore,conf,section,
48 taskname=taskname,**kwargs)
55 """Initializes all Product objects to make them available to
66 fhrs=range(int(self.
fcstlen+24.001))
68 atime=to_datetime(self.conf.cycle)
69 ftimes=[to_datetime_rel(t*3600,atime)
for t
in fhrs]
73 prodnameA=self.
timestr(
'hwrf_basin.{fahr:03d}.a',ftime,atime)
74 filepathA=self.
icstr(
'{com}/{out_prefix}.{pn}',pn=prodnameA)
75 prodnameB=self.
timestr(
'hwrf_basin.{fahr:03d}.b',ftime,atime)
76 filepathB=self.
icstr(
'{com}/{out_prefix}.{pn}',pn=prodnameB)
90 for what
in (
'restart_out',
'restart_outR'):
99 ffiles=[
'airtmp',
'precip',
'presur',
'radflx',
'shwflx',
'surtmp',
100 'tauewd',
'taunwd',
'vapmix',
'wndspd']
103 file=
'forcing.%s.%s'%(ffile,ab)
108 logger.debug(
'%s => %s (%s)'%(file,comf,repr(prod)))
116 self.
confstrinterp(
'{com}/{out_prefix}.standalone.blkdat.input'))
119 """Fills the ocean status files with information. This is
120 called from exhwrf_ocean_init after a successful call to the
121 run() function below. The ocstatus argument is the
122 hwrf.coupling.OceanStatus object that fills the files."""
128 'step3_worked='+(
'YES' if self.
step3_worked else 'NO'), ])
130 """Reads the ocean status back in during the forecast and
131 check_init jobs, filling the self.run_coupled,
132 self.forecast_exe and self.step3_worked variables."""
134 for filename
in ocstatus.fileiter():
138 lines=ocstatus.read(logger)
141 m=re.match(
'^ *([^ =]+) *= *(.*?) *$',line)
143 logger.warning(
'%s: unparseable ocstatus line: %s'
146 (key,value)=m.groups()
151 if key==
'RUN_COUPLED':
155 logger.warning(
'%s: ignoring unrecognized RUN_COUPLED value %s'%(
156 filename,repr(value)))
158 elif key==
'step3_worked': self.
step3_worked=value==
'YES'
160 logger.warning(
'%s: ignoring unkown key %s'%(filename,key))
163 """!Fills the RTOFS staging area with RTOFS data."""
168 rtofs_atime=datetime.datetime(cyc.year,cyc.month,cyc.day,0)
169 rtofs_ymd=rtofs_atime.strftime(
'%Y%m%d')
172 tardir=self.
confstr(
'RTOFS_TAR',
'/dev/null')
173 histdir=self.
confstr(
'RTOFS_HIST',
'/dev/null')
174 fcstdir=self.
confstr(
'RTOFS_FCST',
'/dev/null')
177 outdir=self.
confstr(
'RTOFS_STAGE',
'')
179 outdir=os.path.join(self.
workdir,
180 rtofs_atime.strftime(
'rtofs.%Y%m%d'))
184 with
NamedDir(outdir,keep=
True,logger=logger,rm_first=
False)
as d:
185 parmin=self.
confstrinterp(
'{PARMhwrf}/hwrf_get_rtofs.nml.in')
186 parmout=
'get_rtofs.nml'
187 with open(parmin,
'rt')
as inf:
188 with open(parmout,
'wt')
as outf:
189 outf.write(ni.parse(inf,logger,parmin,atime=rtofs_atime))
190 checkrun(mpirun(mpi(self.
getexe(
'hwrf_get_rtofs')),allranks=
True),
194 """Runs the hycom initialization. Raises an exception if
195 something goes wrong. Returns on success."""
208 logger=logger,rm_first=
True)
as d:
210 checkrun(
scriptexe(self,
'{USHhwrf}/hycom/select_domain.sh'),
212 self.hycom_settings.deliver(frominfo=
'./hycom_settings')
217 checkrun(
scriptexe(self,
'{USHhwrf}/hycom/create_bc_ic.sh'),
221 RUNmodIDout=read_RUNmodIDout(
'./hycom_settings')
223 RUNmodIDout=RUNmodIDout)
229 for (ftime,prod)
in self.init_file2a.iteritems():
230 fromloc=prod.prodname
231 prod.deliver(frominfo=fromloc,keep=
True,logger=logger)
234 for (ftime,prod)
in self.init_file2b.iteritems():
235 fromloc=prod.prodname
236 prod.deliver(frominfo=fromloc,keep=
True,logger=logger)
239 for(prodname,prod)
in self.restart_out.iteritems():
240 (local,ab)=prodname.split(
'.')
241 loc=self.
icstr(
'{'+local+
'}',ab=ab,RUNmodIDout=RUNmodIDout)
242 prod.deliver(location=loc,frominfo=prodname,
243 keep=
True,logger=logger)
246 notab=self.conf.cycle.strftime(
'archv.%Y_%j_%H.')
248 loc=self.
icstr(
'{spin_archv}',ab=ab,RUNmodIDout=RUNmodIDout)
249 self.spin_archv_a.deliver(
250 frominfo=notab+ab,location=loc,keep=
True,
254 checkrun(
scriptexe(self,
'{USHhwrf}/hycom/init_step3.sh'),
258 for (name,prod)
in self.forcing_products.iteritems():
259 prod.deliver(frominfo=
'./'+name)
261 self.limits.deliver(frominfo=
'./limits')
263 self.blkdat_input.deliver(frominfo=
'./blkdat.input')
268 except Exception
as e:
269 logger.error(
'Unhandled exception in ocean init: %s'
270 %(str(e),),exc_info=
True)
278 def __init__(self,hycomfcst,ocstatus):
283 return self.hycomfcst.hycominit
285 """This subroutine is run by the check_init job and checks to
286 see if the initialization has succeeded. It returns True if
287 the inputs are all present, and False if they're not."""
290 hi.recall_ocstatus(self.
ocstatus,logger)
291 if not hi.run_coupled:
292 logger.warning(
'Hycom init says we will not run coupled.')
299 for (ftime,prod)
in hi.init_file2a.iteritems():
302 for (ftime,prod)
in hi.init_file2b.iteritems():
315 if not prod.available:
316 logger.error(
'%s: product not available'%(prod.did,))
317 elif not prod.location:
318 logger.error(
'%s: no path set in database'%(prod.did,))
319 elif not os.path.exists(prod.location):
320 logger.error(
'%s: %s: file does not exist'%(
321 prod.did,prod.location))
323 logger.info(
'%s: %s: product is delivered'%(
324 prod.did,prod.location))
327 logger.error(
'Have only %d of %d products. Ocean init failed.'%(
331 logger.info(
'Rejoice: we have all coupled inputs')
334 """Called from the forecast job. If just_check=True, this
335 calls check_coupled_inputs. Otherwise, this links all hycom
336 inputs to the local directory."""
339 hi=self.hycomfcst.hycominit
340 hi.recall_ocstatus(self.
ocstatus,logger)
341 if not hi.run_coupled:
342 logger.warning(
'Hycom init says we will not run coupled.')
349 for (ftime,prod)
in hi.init_file2a.iteritems():
352 for (ftime,prod)
in hi.init_file2b.iteritems():
361 cycle=self.hycominit.conf.cycle
363 if not prod.available
or not prod.location
or \
364 not os.path.exists(prod.location):
365 msg=
'%s: input not present (location=%s available=%s)'\
366 %(prod.did, repr(prod.location), repr(prod.available))
369 m=re.match(
'hwrf_basin.0*(\d+).([ab])',prod.prodname)
371 make_symlink(prod.location,prod.prodname,
372 logger=logger,force=
True)
376 logger.info(
'Link hour %s relative to cycle %s.'%(repr(hr),repr(cycle)))
378 name1=t.strftime(
'nest/archv.%Y_%j_%H.')+ab
379 name2=t.strftime(
'incup/incupd.%Y_%j_%H.')+ab
380 for name
in ( name1, name2 ):
381 make_symlink(prod.location,name,
382 logger=logger,force=
True)
385 op=hi.icstr(
'{out_prefix}.')
388 for part
in [
'rtofs',
'limits',
'forcing' ]:
389 globby=hi.icstr(
'{com}/{out_prefix}.{part}*',part=part)
396 for path
in glob.glob(globby):
399 localname=path[ipref+nop:]
401 finalname=re.sub(
'.*\.restart\.([^/]*)',
'restart_in.\\1',localname)
402 make_symlink(path,finalname,force=
True,logger=logger)
403 logger.info(
'%s: %d files linked\n',globby,nfound)
435 hsprod=self.hycominit.hycom_settings
436 assert(hsprod
is not None)
437 assert(hsprod.available)
438 assert(hsprod.location)
439 RUNmodIDout=read_RUNmodIDout(hsprod.location)
444 def link_hycom_parm(self,RUNmodIDout):
445 logger=self.hycominit.log()
446 mine={
'fcst.blkdat.input':
'blkdat.input',
447 'patch.input.90':
'patch.input',
448 'ports.input':
'ports.input' }
449 for (parmbase,localname)
in mine.iteritems():
450 parmfile=self.hycominit.icstr(
451 '{PARMhwrf}/hwrf_{RUNmodIDout}.basin.{PARMBASE}',
452 RUNmodIDout=RUNmodIDout,PARMBASE=parmbase)
455 def link_hycom_fix(self,RUNmodIDout):
457 logger=self.hycominit.log()
458 globby=self.hycominit.icstr(
'{FIXhycom}/hwrf_{RUNmodIDout}.basin.*',
459 RUNmodIDout=RUNmodIDout)
460 forcewanted=set([
'chl.a',
'chl.b',
'kpar.a',
'kpar.b',
'offlux.a',
'offlux.b',
'rivers.a',
'rivers.b'])
463 for path
in glob.glob(globby):
464 basename=os.path.basename(path)
465 fd=basename.find(
'forcing')
469 bd=basename.find(
'basin.')
474 forcetype=basename[(fd+8):]
475 if forcetype
in forcewanted:
479 logger.info(
'%s: not linking %s'%(basename,path))
482 logger.info(
'Linked %d of %d HyCOM fix files for RUNmodIDout=%s'%(
483 nlinked,n,repr(RUNmodIDout)))
488 """Returns an MPIRanksBase to run the executable chosen by the
489 initialization. This function must only be called after
490 link_coupled_inputs"""
491 if not isinstance(ranks,int):
492 raise TypeError(
'The ranks argument to make_exe must be an int. You provided a %s %s'%(type(ranks).__name__,repr(ranks)))
493 wantexe=task.hycominit.forecast_exe
496 'The forecast_exe option was not specified in the '
497 'ocean status file.')
498 return mpi(wantexe)*ranks
501 """This subclass of CoupledWRF runs the HyCOM-coupled WRF."""
502 def __init__(self,dstore,conf,section,wrf,ocstatus,keeprun=True,
503 wrfdiag_stream=
'auxhist1',hycominit=
None,**kwargs):
504 if not isinstance(hycominit,HYCOMInit):
506 'The hycominit argument to WRFCoupledHYCOM.__init__ must be a '
507 'HYCOMInit object. You passed a %s %s.'%
508 (type(hycominit).__name__,repr(hycominit)))
509 super(WRFCoupledHYCOM,self).__init__(dstore,conf,section,wrf,keeprun,
510 wrfdiag_stream,**kwargs)
514 self.
couple(
'coupler',
'hwrf_wm3c',
'wm3c_ranks',1)
515 self.
couple(
'hycom',
'hycom',
'ocean_ranks',90,hycominiter)
516 self.
couple(
'wrf',
'wrf',
'wrf_ranks')
520 """Returns the HYCOMInit object."""
524 """Runs the ocean post-processor on the HyCOM output, in parallel
526 def __init__(self,ds,conf,section,fcstlen,hycom,**kwargs):
527 super(HYCOMPost,self).__init__(ds,conf,section,**kwargs)
531 """Called from the ocean post job to run the HyCOM post."""
535 checkrun(
scriptexe(self,
'{USHhwrf}/hycom/ocean_post.sh'),
538 except Exception
as e:
540 logger.error(
"Ocean post failed: %s"%(str(e),),exc_info=
True)
543 """Called from the unpost job to delete the HyCOM post output
544 in preparation for a rerun of the entire post-processing for
549 checkrun(
scriptexe(self,
'{USHhwrf}/hycom/ocean_unpost.sh'),
552 except Exception
as e:
554 logger.error(
"Ocean post failed: %s"%(str(e),),exc_info=
True)
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
def check_coupled_inputs(self, logger)
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
def link_hycom_parm(self, RUNmodIDout)
def to_datetime_rel(d, rel)
Converts objects to a datetime relative to another datetime.
taskname
Read-only property: the name of this task.
A subclass of Product that represents file delivery.
The base class of tasks run by the HWRF system.
conf
This HWRFTask's hwrf.config.HWRFConfig object.
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
def make_exe(self, task, exe, ranks)
def link_hycom_fix(self, RUNmodIDout)
def fill_ocstatus(self, ocstatus, logger)
def find_rtofs_data(self)
Fills the RTOFS staging area with RTOFS data.
section
The confsection in self.section for this HWRFTask (read-only)
A shell-like syntax for running serial, MPI and OpenMP programs.
Base class of tasks run by HWRF.
Stores products and tasks in an sqlite3 database file.
def check_coupled_inputs(self, logger)
This subclass of TempDir takes a directory name, instead of generating one automatically.
def makedirs
Make a directory tree, working around filesystem bugs.
Time manipulation and other numerical routines.
workdir
The directory in which this task should be run.
def timestr(self, string, ftime, atime=None, section=None, kwargs)
Expands a string in the given conf section, including time vars.
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
def log
Obtain a logging domain.
Insert config file data into a Fortran namelist file.
A time-indexed array that can only handle equally spaced times.
Raised when the ocean init did not produce some expected outputs.
Exceptions raised by the hwrf package.
def confstr
Alias for self.conf.getstr for section self.section.
def link_coupled_inputs(self, just_check, logger)
def recall_ocstatus(self, ocstatus, logger)
def icstr(self, string, section=None, kwargs)
Expands a string in the given conf section.
def scriptexe(task, path)
def make_symlink
Creates a symbolic link "target" that points to "source".