1 """This module implements FcstTask classes for the hwrf
2 multistorm basin scale multistorm final merge task.
4 FinalMergeTask - Create a multistorm final merge domain 01 product,
5 from the required domain 01 information from storm in
30 __all__=[
'FinalMergeTask' ]
39 from produtil.fileop import realcwd, make_symlink, deliver_file, make_symlinks_in
48 """This is the abstract base class of anything that gets, or
49 creates, input files for the Final Merge without running another
50 Task. For example, something that copies the wrfinput output would
51 be a subclass of Input2FinalMerge"""
52 def __init__(self,src): self.src=src
54 """In subclasses, this function copies or links the input
55 files. This default implementation does nothing."""
57 def link_product(self,product,excclass,logger,target=None,
59 """Links the file from the given product to the target
60 location (basename(product.location) if no target is
61 provided). If the product is not yet available or has no
62 location, then the given exception class excclass is
64 assert(logger
is not None or not just_check)
66 (L,A)=(product.location,product.available)
68 if target
is None: target=os.path.basename(L)
72 elif logger
is not None:
73 msg=
'%s: file is empty or non-existent'%(L,)
76 make_symlink(L, target, logger=logger,force=
True)
78 msg=
'%s: unavailable (available=%s location=%s)'%(
79 str(product.did),repr(A),repr(L))
80 if logger
is not None: logger.warning(msg)
81 if just_check:
return False
88 """Links real_nmm gfsinit/realinit/wrfinput_d01 file or returns the FileProduct,
89 regardless of the time and domain, in the current directory for
91 def __init__(self,stormid,src):
92 super(WRFInput2FinalMerge,self).__init__(src)
98 def get_inputs(self,logger,just_check=False, **kwargs):
99 if self.
src is not None:
100 p=self.src.get_wrfinput(self.
stormid)
104 target=
'wrfinput_d01',
105 just_check=just_check)
109 """Links gdas_merge/wrfinput_d01 file or returns the FileProduct,
110 regardless of the time and domain, in the current directory for
112 def __init__(self,stormid,src):
113 super(Merge2FinalMerge,self).__init__(src)
117 return 'wrfinput_d01_'+self.
stormid+
'gdas_merge.nc'
118 def get_inputs(self,logger,just_check=False, stormid=None, **kwargs):
119 if stormid
is not None and stormid!=self.
stormid:
120 if logger
is not None:
122 'Wrong stormid requested: %s instead of %s, cannot link.'
125 if self.
src is not None:
127 p=self.src.get_merge(self.
stormid)
130 target=tgt,just_check=just_check)
136 """This is a FcstTask since it prepares input to a forecast model.
137 The FcstTask class provides some methods required by the final merge task.
138 This task merges all the domain 01 files from the relocation.Merge(RelocationTask)
139 of each storm and produces a single d01 merged file that is comprise of all the
140 storms in a multstorm."""
142 def __init__(self,dstore,conf,section,taskname=None,**kwargs):
144 super(FinalMergeTask,self).__init__(dstore,conf,section,taskname=taskname,
150 self.
real_stormids = conf.getstr(
'config',
'multistorm_sids').split()
154 "Multistorm run: ERROR Can only run "
155 "finalmerge on the fakestorm of a multistorm run.")
169 with dstore.transaction()
as t:
189 def update_state(self):
192 def wrfinput_at_time(self,atime,domain):
196 """Performs all work needed to run the Final Merge program and create
197 the final merged product. Creates the work directory, CD's to it,
198 Creates a domain mapping dictionary for all the storms. ,link_all_inputs,
199 run_exe, and deliver_products."""
207 self.
postmsg(
'Final Merge Task running in directory: '+os.getcwd())
214 if os.path.exists(runhere):
215 logger.warning(
'%s: directory exists; will delete'%(runhere,))
216 assert(
not os.path.samefile(self.
getdir(
'WORKhwrf'),runhere))
217 shutil.rmtree(runhere)
220 keep_on_error=
True)
as rundir:
223 logger.info(
'%s running in directory %s'%(
239 self._prod_wrfinput_d01_finalmerge.available=
True
241 except Exception
as e:
243 logger.critical(
'%s failed: %s'%(self.
taskname,str(e)),
260 """Runs the hwrf_diffwrf_3dvar program to convert all the required
261 wrfinput_d01 files from netcdf to binary.
263 returns a tuple (gfsrealinit_bin, gdas_merge_bins)
264 with the the binary filenames that will be used
265 as input for the final merge executable.
267 self.
log().info(
'convert_wrfinput_d01_2bin')
274 commonWORKhwrf = self.
getdir(
'WORKhwrf')[:-4]
279 gfsrealinit_nclinkname=
'wrfinput_d01_'+self.
priority_stormid+
'gfsrealinit.nc'
280 gfsrealinit_ncfile=os.path.join(self.
workdir,
'wrfinput_d01')
282 make_symlink(gfsrealinit_ncfile,gfsrealinit_nclinkname,force=
True, logger=logger)
284 fin_target = gfsrealinit_nclinkname
285 fout_target = gfsrealinit_bin
288 log =
'%s/logs/%s_%s_%s.log' %(
290 cmd =
produtil.run.exe(prog)[
'storm_relocate', fin_target,
'flnm3', fout_target]
300 gdasmerge_nclinkname = os.path.join(self.
workdir,
'wrfinput_d01_'+stormid+
'gdas_merge.nc')
307 fin_target = os.path.basename(gdasmerge_nclinkname)
308 fout_target =
'wrfinput_d01_'+stormid+
'gdas_merge.bin'
309 gdas_merge_bins.append(fout_target)
311 log =
'%s/logs/%s_%s_%s.log' %(
312 self.
dest_dir, self.__class__.__name__, fprog,
'gdas_merge2bin'+stormid)
314 cmd =
produtil.run.exe(prog)[
'storm_relocate', fin_target,
'flnm3', fout_target]
318 return gfsrealinit_bin, gdas_merge_bins
321 """Runs the hwrf_diffwrf_3dvar program to convert the final
322 merge binary file to netcdf."""
323 self.
log().info(
'convert_wrfinput_d01_2bin')
331 make_symlink(self.
fmprodname+
'.finalmerge.bin',
'fort.56',force=
True, logger=logger)
334 fin_target =
'fort.56'
338 log =
'%s/logs/%s_%s_%s.log' %(
348 """Runs the hwrf-utilities hwrf_final_merge program with
349 the following inputs and generates the final merge output file,
350 all in this tasks self.workdir .
352 gfsrealinit_bin - the name of the d01 priority storm
353 gfsrealinit binary filename
355 gdasmerge_bins - a list of gdas_merge d01 binary
356 filenames for each real storm"""
358 self.
log().info(
'final_merge')
359 fprog =
'hwrf_final_merge'
361 prog = self.
getexe(
'hwrf_final_merge')
375 ins = { 40:gfsrealinit_bin }
376 for i, gdas_merge_bin
in enumerate(gdasmerge_bins):
377 ins[i+41] = gdas_merge_bin
380 ous = { 56:self.
fmprodname+
'.finalmerge.bin' }
382 self.
run_ext(fprog, echo=evars, inputs=ins, outputs=ous)
385 """Iterates over all products generated by this task."""
392 """Adds an input source (via self.add_input) that will provide
393 the wrfinput output file from real_nmm. The given object must
394 have a wrfinput_at_time(atime,domain) function that returns a
395 Product for a given analysis time and WRFDomain object.
402 """Adds an input source (via self.add_input) that will provide
403 the wrfinput output file from a prior gdas_merge, real_nmm or
404 relocation. This is used for the storm's final output of the
405 per-storm initialization."""
409 frominfo=
None,**kwargs):
410 """Delivers products to intercom via Product.deliver. Any
411 keyword arguments are passed on to Product.deliver. By
412 default, keep=False, which means the local copy of the file
413 may no longer exists. If frominfo is specified, it will be
415 if logger
is None: logger=self.
log()
416 logger.warning(
'Delivering products for %s'%(self.
taskname,))
426 bloc=os.path.basename(loc)
427 if os.path.exists(bloc):
428 logger.warning(
'%s: deliver product from ./%s'%(p.did,bloc))
429 p.deliver(frominfo=bloc,keep=keep,logger=logger,**kwargs)
431 logger.warning(
'%s: ./%s does not exist. Cannot deliver.'
433 if missing
is not None:
457 """Copies, or makes, one or more input files."""
460 def namer(p,logger,*args):
return names[p]
461 def actor(p,name,logger,*args): action[p](p,name,logger,*args)
473 def _make_plist_and_names(self):
474 """This is an internal implementation function that should not
475 be called directly. It returns a three-element tuple
476 containing a list of products, and a dict mapping from product
477 to the local filename, and a dict mapping from product to the
478 copy method. This is used to implement copy_inputs, to copy
479 input files to the local directory from remote tasks'
481 def copier(p,name,logger,*args):
482 deliver_file(p.location,name,logger=logger,keep=
True)
483 def linker(p,name,logger,*args):
484 make_symlink(p.location,name,force=
True,logger=logger)
501 for merge
in self.
inputs[
'merge']:
502 mergeprod=merge.src.get_merge(merge.stormid)
504 tgtname=merge.tgtname
505 self.
log().info(
'Product %s location %s going to %s'%(
506 mergeprod.did,mergeprod.location,tgtname))
507 names[mergeprod]=tgtname
511 plist=[ k
for k
in names.iterkeys() ]
512 actions=dict( (n,copier)
for n
in names.iterkeys() )
513 return ( plist, names, actions )
521 def _fake_merge_file(self, sleep_secs):
522 """Create a link to a phony final merge file for initial dev testing.
523 without having to run the final merge executable"""
527 logger.info(
'fake_merge_file going to sleep for %s seconds'%sleep_secs)
528 time.sleep(sleep_secs)
536 fmfile = os.path.join(
'../finalmerge.t',self.
fmprodname)
538 make_symlink(fmfile,fmlinkname,force=
True, logger=logger)
541 def run_ext(self, cmd, echo=None, inputs=None, incopies=None,
542 outputs=
None, opt_outputs=
None):
543 """Run an external command linking in fort.X files for input
544 and output. If redirect=True, redirect logs to a separate file.
546 It will use "self.getexe()" on the command to find the
547 external program to execute.
549 If a list is passed in as the echo variable, then /bin/echo
550 will be run with that list and piped to the external command.
552 The inputs dictionary should consist of a fortran file
553 number and the source file, such as:
554 inputs = {11:tcvitals, 12:wrfout_d01}
555 would produce symbolic links:
557 fort.12 -> wrfout_d01
558 input files can also be copied using incopies:
559 incopies = {11:tcvitals, 12:wrfout_d01}
560 would create files instead of links.
562 The outputs and opt_outputs (optional outputs) should be of the
563 dictionary as the inputs. As in:
564 outputs = {56:new_data_4x, 85:storm_radius}
565 this would mean the "fort.56" file would be renamed to "new_data_4x"
566 and the "fort.85" renamed to "storm_radius".
568 If opt_outputs is given then the fortran file is tested to see if it
569 exists and only if it does is it renamed to the output filename.
571 A log file will be created consisting of the stdout and stderr of the
572 command run. It will be named consisting of the taskname and command.
573 For example, if this is relocation stage 1 and the command is
574 hwrf_pert_ct then the log file is "rel_stage_1_hwrf_pert_ct.log" """
578 prog = self.
getexe(cmdname)
579 logf =
'%s/logs/%s_%s.log' %(self.
dest_dir,
580 self.__class__.__name__, cmdname)
586 if isinstance(s,float): echostr+=
"%g "%(s,)
587 elif isinstance(s,int): echostr+=
"%d "%(s,)
588 else: echostr+=
"%s "%(str(s),)
590 'Converted %s to %s for stdin input to fortran command.'
591 %(repr(echo),repr(echostr)))
602 if inputs
is None: inputs=empty
603 if outputs
is None: outputs=empty
604 if incopies
is None: incopies=empty
605 iof = dict(itertools.chain(inputs.items(), outputs.items(),
616 only_log_errors=
True, logger=logger)
619 logger.warning(repr(cmd))
624 for k, v
in outputs.iteritems():
626 if os.path.exists(ffile):
627 deliver_file(ffile, v, keep=
False,logger=logger)
629 logger.error(
'%s: did not make file %s (would mv to %s)'
630 %(cmdname,ffile,str(v)))
634 for k, v
in opt_outputs.iteritems():
635 ffile =
'fort.' + str(k)
636 if os.path.exists(ffile):
637 deliver_file(ffile, v, keep=
False,logger=logger)
640 '%s: did not make file %s (would mv to %s).'
641 %(cmdname,ffile,str(v)))
644 for k,v
in inputs.iteritems():
645 if os.path.islink(
'fort.'+str(k)):
646 logger.info(
'%s: deleting input fort file (symlink to %s)'
651 for k,v
in incopies.iteritems():
652 if os.path.exists(
'fort.'+str(k)):
653 logger.info(
'%s: deleting input fort file (copy of %s)'
662 def run_exe(self,exename='final_merge',runner=None,sleeptime=None):
663 """Runs the executable this task is responsible for running.
664 Determines if the program ran correctly. The exename is the
665 name of the argument in the [exe] section of the
668 runner=Runner - pass a produtil.prog.Runner object if
669 desired. This overrides any decision of what to run: the
670 exename will be ignored, and whatever is supplied in runner
671 is simply passed to produtil.run.run.
672 sleeptime - passed to produtil.run.run to determine how
673 often to check the child process. By default, the sleeptime
674 option in this task's config section is used, or if that is
675 absent, 30 seconds."""
676 if sleeptime
is None:
683 logger.info(
'%s: exit status %d'%(exename,stat))
684 if not check_last_lines(
'rsl.out.0000',
'SUCCESS COMPLETE',
686 msg=
'%s: did not see SUCCESS COMPLETE in rsl.out.0000'%(exename,)
688 raise RealNMMError(msg)
690 logger.info(
'%s: SUCCESS COMPLETE in rsl.out.0000'%(exename,))
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
Imitates the shell "ls -l" program.
abstract base class of anything that runs or prepares input for a forecast model
Raised when one tries to use an invalid string for an option name.
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
_prod_wrfinput_d01_finalmerge
def redirect(self)
Should subprograms' outputs be redirected to separate files?
taskname
Read-only property: the name of this task.
A subclass of Product that represents file delivery.
def wait_for_products
Waits for products to be available and performs an action on them.
def remove_file
Deletes the specified file.
def final_merge(self, gfsrealinit_bin, gdasmerge_bins)
def fortcopy(forts, basedir=None, logger=None, only_log_errors=False, kwargs)
A convenience function for copying files to local fort.N files for various integers N using deliver_f...
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
def run(arg, logger=None, sleeptime=None, kwargs)
Executes the specified program and attempts to return its exit status.
def openmp
Sets the number of OpenMP threads for the specified program.
def add_merge(self, r, stormid)
A shell-like syntax for running serial, MPI and OpenMP programs.
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
A piece of data produced by a Task.
outdir
The directory in which this task should deliver its final output.
Runs the real_nmm or wrf executables.
Stores products and tasks in an sqlite3 database file.
This subclass of TempDir takes a directory name, instead of generating one automatically.
def makedirs
Make a directory tree, working around filesystem bugs.
def fortlink
This is a convenience routine that makes many symbolic links to fort.N files for various integers N u...
workdir
The directory in which this task should be run.
def convert_wrfinput2bin(self)
def conffloat
Alias for self.conf.getfloat for section self.section.
def log
Obtain a logging domain.
def add_wrfinput(self, r, stormid)
inputs
a mapping of typename to a list of input objects
Contains the Listing class, which emulates "ls -l".
def _make_plist_and_names(self)
def add_input(self, typename, inobj)
add input of a specified type
Exceptions raised by the hwrf package.
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
def deliver_products(self, missing=None, logger=None, keep=False, frominfo=None, kwargs)
def products(self, kwargs)
def finalmerge2netcdf(self)
def bigexe(name, kwargs)
Alias for exe() for backward compatibility.