HWRF  trunk@4391
finalmergetask.py
1 """This module implements FcstTask classes for the hwrf
2 multistorm basin scale multistorm final merge task.
3 
4  FinalMergeTask - Create a multistorm final merge domain 01 product,
5  from the required domain 01 information from storm in
6  a multistorm hwrf run.
7 
8 """
9 
10 # Frimel, DTC This module was taken, copied, templated and modeled
11 # from relocate.py and fcsttask.py - Thank You !
12 
13 # This FinalMergeTask class:
14 # Currently meant to run on a storm that has no ocean coupling.
15 # Currently No input checking is performed, assumes gfs realinit and gdas_merge task
16 # for each storm is complete and successful.
17 
18 # 1. Get the list of storms .
19 # 2. set the priority storm pstorm, from the storm id list.
20 # 3. link to the gfsinit/realinit/wrfinput_d01 of the priority pstorm
21 # 4. link the /gdas_merge/wrfinput_d01 of the priority storm
22 # 5. For each additional storm link to the gdas_merge/wrfinput_d01 from each storm.
23 # 6. these are all links in the final_merge workdir back to each storm
24 # 7. Convert the files from netcdf to binary
25 # 8. Perform Final Merge
26 # 9. Convert the merged file from binary to netcdf
27 # 10 Deliver the final merge netcdf file to this tasks outdir.
28 
29 
30 __all__=[ 'FinalMergeTask' ]
31 
32 import os, shutil
33 import hwrf.fcsttask
36 import itertools
37 
38 from produtil.cd import NamedDir
39 from produtil.fileop import realcwd, make_symlink, deliver_file, make_symlinks_in
40 from produtil.datastore import FileProduct, COMPLETED, RUNNING, FAILED
41 
42 from produtil.run import alias, bigexe, exe, checkrun
43 from hwrf.fcsttask import FcstTask
44 from hwrf.exceptions import WRFInputMissing
45 
46 # Modeled after fcsttask.Input2Fcst(Object).
47 class Input2FinalMerge(object):
48  """This is the abstract base class of anything that gets, or
49  creates, input files for the Final Merge without running another
50  Task. For example, something that copies the wrfinput output would
51  be a subclass of Input2FinalMerge"""
52  def __init__(self,src): self.src=src
53  def get_inputs(self,just_check=False,**kwargs):
54  """In subclasses, this function copies or links the input
55  files. This default implementation does nothing."""
56  return True
57  def link_product(self,product,excclass,logger,target=None,
58  just_check=False):
59  """Links the file from the given product to the target
60  location (basename(product.location) if no target is
61  provided). If the product is not yet available or has no
62  location, then the given exception class excclass is
63  raised."""
64  assert(logger is not None or not just_check)
65  assert(isinstance(product,produtil.datastore.Product))
66  (L,A)=(product.location,product.available)
67  if L and A:
68  if target is None: target=os.path.basename(L)
69  if just_check:
70  if isnonempty(L):
71  return True
72  elif logger is not None:
73  msg='%s: file is empty or non-existent'%(L,)
74  logger.warning(msg)
75  return False
76  make_symlink(L, target, logger=logger,force=True)
77  return True
78  msg='%s: unavailable (available=%s location=%s)'%(
79  str(product.did),repr(A),repr(L))
80  if logger is not None: logger.warning(msg)
81  if just_check: return False
82  raise excclass(msg)
83 
84 #######################################################################################
85 
86 # Modeled after fcsttask.WFRInput2WRF(Input2Fcst).
88  """Links real_nmm gfsinit/realinit/wrfinput_d01 file or returns the FileProduct,
89  regardless of the time and domain, in the current directory for
90  a final merge."""
91  def __init__(self,stormid,src):
92  super(WRFInput2FinalMerge,self).__init__(src)
93  self.stormid=stormid
94  @property
95  def tgtname(self):
96  #return 'wrfinput_d01_'+self.stormid+'realinit.nc'
97  return 'wrfinput_d01'
98  def get_inputs(self,logger,just_check=False, **kwargs):
99  if self.src is not None:
100  p=self.src.get_wrfinput(self.stormid)
101  tgt=self.tgtname
102  if p:
103  return self.link_product(p,WRFInputMissing,logger,
104  target='wrfinput_d01',
105  just_check=just_check)
106  return False
107 
109  """Links gdas_merge/wrfinput_d01 file or returns the FileProduct,
110  regardless of the time and domain, in the current directory for
111  a final merge."""
112  def __init__(self,stormid,src):
113  super(Merge2FinalMerge,self).__init__(src)
114  self.stormid=stormid
115  @property
116  def tgtname(self):
117  return 'wrfinput_d01_'+self.stormid+'gdas_merge.nc'
118  def get_inputs(self,logger,just_check=False, stormid=None, **kwargs):
119  if stormid is not None and stormid!=self.stormid:
120  if logger is not None:
121  logger.error(
122  'Wrong stormid requested: %s instead of %s, cannot link.'
123  %(stormid,self.stormid))
124  return False
125  if self.src is not None:
126  tgt=self.tgtname
127  p=self.src.get_merge(self.stormid)
128  if p:
129  return self.link_product(p,WRFInputMissing,logger,
130  target=tgt,just_check=just_check)
131  return False
132 
133 #######################################################################################
134 
136  """This is a FcstTask since it prepares input to a forecast model.
137  The FcstTask class provides some methods required by the final merge task.
138  This task merges all the domain 01 files from the relocation.Merge(RelocationTask)
139  of each storm and produces a single d01 merged file that is comprise of all the
140  storms in a multstorm."""
141 
142  def __init__(self,dstore,conf,section,taskname=None,**kwargs):
143 
144  super(FinalMergeTask,self).__init__(dstore,conf,section,taskname=taskname,
145  **kwargs)
146  #taskname=self.taskname
147  # Just making sure.
148  if self.isfakestorm:
149  self.fake_stormid = conf.getstr('config','fakestormid')
150  self.real_stormids = conf.getstr('config','multistorm_sids').split()
151  self.priority_stormid = conf.getstr('config','multistorm_priority_sid')
152  else:
154  "Multistorm run: ERROR Can only run "
155  "finalmerge on the fakestorm of a multistorm run.")
156 
157  self.dest_dir=self.getdir('WORKhwrf')
158 
159  # Define the product (filename) this task is creating,
160  # in the Products Table of the database.
161  # The final merge file will be specified in the intercom directory since it will
162  # be shared by other storms.
163 
164  # TODO: CRITICAL, For now keeping the finalmerged file filename at wrfinput_d01.
165  # which starts out as the copy of <prioritySID>/gfsinit/realinit/wrfinput_d01
166  # in finalmerge/wrfinput_d01 but ends up being the "final merged" file.
167  self.fmprodname = 'wrfinput_d01'
168 
169  with dstore.transaction() as t:
171  self.taskname, location=os.path.join(self.outdir,self.fmprodname))
172  # Sam says:
173  # Use self.merge_inputs and self.real_input to store locations of
174  # files in other storms' workflows. I don't know where in this class
175  # you grab the wrfinput files, but you would have to change it to grab
176  # the files from the self.merge_inputs and self.real_input
177  # self.merge_inputs=list()
178  # self.real_input=None
179  # def add_merge_input(filename):
180  # self.merge_inputs.append(UpstreamFile(self.dstore,self.conf,self.taskname,location=self.confstrinterp(filename)))
181  # return self
182  # def set_real_input(filename)
183  # self.real_input=UpstreamFile(self.dstore,self.conf,self.taskname,location=self.confstrinterp(filename))
184  # return self
185  # def wait_for_things(self):
186  # plist=list(self.merge_inputs)
187  # plist.append(self.real_input)
188  # wait_for_products(plist,self.log())
189  def update_state(self):
190  self.state=COMPLETED
191 
192  def wrfinput_at_time(self,atime,domain):
194 
195  def run(self):
196  """Performs all work needed to run the Final Merge program and create
197  the final merged product. Creates the work directory, CD's to it,
198  Creates a domain mapping dictionary for all the storms. ,link_all_inputs,
199  run_exe, and deliver_products."""
200 
201  logger=self.log()
202  # NOTE: some of these are sent as postmsg instead of
203  # logger.info. That ensures they are in the jlogfile, which
204  # contains information across all cycles of all storms and all
205  # models. That way, we can find unexpected cold starts.
206 
207  self.postmsg('Final Merge Task running in directory: '+os.getcwd())
208 
209  # WORKhwrf/intercom/finalmerge
211 
212  runhere=self.workdir
213  self.keeprun = True
214  if os.path.exists(runhere):
215  logger.warning('%s: directory exists; will delete'%(runhere,))
216  assert(not os.path.samefile(self.getdir('WORKhwrf'),runhere))
217  shutil.rmtree(runhere)
218  # jtfst rundir ?
219  with NamedDir(runhere,keep=self.keeprun,logger=logger,
220  keep_on_error=True) as rundir:
221  try:
222  self.state=RUNNING
223  logger.info('%s running in directory %s'%(
224  self.taskname,realcwd()))
225 
226  # Copies all required input file in to the finalmerge working directory.
227  self.copy_inputs()
228 
229  gfsrealinit, gdas_merge = self.convert_wrfinput2bin()
230  self.final_merge(gfsrealinit, gdas_merge)
231  self.finalmerge2netcdf()
232 
233  # self._prod_wrfinput_d01_finalmerge.deliver(
234  # frominfo=self.fmprodname,location=os.path.join(
235  # self.outdir,self.fmprodname))
236 
237  self.deliver_products()
238  self.update_state()
239  self._prod_wrfinput_d01_finalmerge.available=True
240 
241  except Exception as e:
242  self.state=FAILED
243  logger.critical('%s failed: %s'%(self.taskname,str(e)),
244  exc_info=True)
245  raise
246  self.update_state()
247  self.postmsg('%s: completed'%(self.taskname,))
248 
249 
250  # =========================================================================
251  # The hwrf_final_merge program input requires the wrfinput_d01 input files
252  # to be binary, not NetCDF. So they must be converted.
253  #
254  # The first storm in the list is considered the priority storm.
255  # Converts the 'gfsinit/realinit' task's wrfinput_d01 file from netcdf to binary.
256  # Converts the 'gdas_merge' task wrfinput_d01 files for all storms in the
257  # multistorm from netcdf to binary.
258  # ==========================================================================
260  """Runs the hwrf_diffwrf_3dvar program to convert all the required
261  wrfinput_d01 files from netcdf to binary.
262 
263  returns a tuple (gfsrealinit_bin, gdas_merge_bins)
264  with the the binary filenames that will be used
265  as input for the final merge executable.
266  """
267  self.log().info('convert_wrfinput_d01_2bin')
268  logger=self.log()
269  fprog = 'hwrf_3dvar'
270  prog = self.getexe(fprog)
271 
272  # TODO: CRITICAL, QuickImpl section Setting up links and paths - needs rework and hardening <jtf>
273  #common to all storms, removes slash and sid from WORKhwrf, '/04E'
274  commonWORKhwrf = self.getdir('WORKhwrf')[:-4]
275 
276  # Setup sym links from this FinalMergeTask workdir to each storms
277 
278  # Create the priority storm gfsinit/realinit sym links and .nc and .bin filenames.
279  gfsrealinit_nclinkname='wrfinput_d01_'+self.priority_stormid+'gfsrealinit.nc'
280  gfsrealinit_ncfile=os.path.join(self.workdir,'wrfinput_d01')
281  gfsrealinit_bin='wrfinput_d01_'+self.priority_stormid+'gfsrealinit.bin'
282  make_symlink(gfsrealinit_ncfile,gfsrealinit_nclinkname,force=True, logger=logger)
283 
284  fin_target = gfsrealinit_nclinkname
285  fout_target = gfsrealinit_bin
286 
287  # Convert gfsrealinit netcdf to binary.
288  log = '%s/logs/%s_%s_%s.log' %(
289  self.dest_dir, self.__class__.__name__, fprog, 'gfsrealinit2bin'+self.priority_stormid)
290  cmd = produtil.run.exe(prog)['storm_relocate', fin_target, 'flnm3', fout_target]
291  if self.redirect: cmd = cmd >= log
292  produtil.run.checkrun(cmd,logger=logger)
293 
295  # For all the gdas_merge files, in the finalmerge.workdir,
296  # Create symlinks to the .nc files and generate the binary files.
297  # /intercom/gdas_merge/wrfinput_d01
298  gdas_merge_bins = []
299  for stormid in self.real_stormids:
300  gdasmerge_nclinkname = os.path.join(self.workdir, 'wrfinput_d01_'+stormid+'gdas_merge.nc')
301  #gdasmerge_ncfile = os.path.join(commonWORKhwrf,stormid,'intercom/gdas_merge/wrfinput_d01')
302  #make_symlink(gdasmerge_ncfile,gdasmerge_nclinkname,force=True, logger=logger)
303 
304  # Use the basename instead of the absolute path. hwrf_diffwrf_3dvar input
305  # path/filename <= 124 characters. Since this task is running in the
306  # finalmerge directory using the basename should be ok.
307  fin_target = os.path.basename(gdasmerge_nclinkname)
308  fout_target = 'wrfinput_d01_'+stormid+'gdas_merge.bin'
309  gdas_merge_bins.append(fout_target)
310 
311  log = '%s/logs/%s_%s_%s.log' %(
312  self.dest_dir, self.__class__.__name__, fprog, 'gdas_merge2bin'+stormid)
313 
314  cmd = produtil.run.exe(prog)['storm_relocate', fin_target, 'flnm3', fout_target]
315  if self.redirect: cmd = cmd >= log
316  produtil.run.checkrun(cmd,logger=logger)
317 
318  return gfsrealinit_bin, gdas_merge_bins
319 
320  def finalmerge2netcdf(self):
321  """Runs the hwrf_diffwrf_3dvar program to convert the final
322  merge binary file to netcdf."""
323  self.log().info('convert_wrfinput_d01_2bin')
324  logger=self.log()
325  fprog = 'hwrf_3dvar'
326  prog = self.getexe(fprog)
327 
328 
329  # Note: script that this command was modeled after had created another fort.56 link
330  # fort.56 --> self.fmprodname+'.bin', seems like that is not required, but I'll also.
331  make_symlink(self.fmprodname+'.finalmerge.bin','fort.56',force=True, logger=logger)
332  # Do not join self.workdir to fin_target or fout_target files. There is a 124 character limit
333  # to the hwrf_diffwrf_3dvar executables input, output path/filename
334  fin_target = 'fort.56'
335  fout_target = self.fmprodname
336  #fin=os.path.join(self.workdir,'fort.56')
337  #fout=os.path.join(self.workdir,self.fmprodname)
338  log = '%s/logs/%s_%s_%s.log' %(
339  self.dest_dir, self.__class__.__name__, fprog, 'finalmerge2nc'+self.fake_stormid)
340  cmd = produtil.run.exe(prog)['3dvar_update',fout_target, fin_target]
341  if self.redirect: cmd = cmd >= log
342  produtil.run.checkrun(cmd,logger=logger)
343 
344 
345  # The output of the final merge exe is a binary file and
346  # that needs to be converted to netcdf.
347  def final_merge(self, gfsrealinit_bin, gdasmerge_bins):
348  """Runs the hwrf-utilities hwrf_final_merge program with
349  the following inputs and generates the final merge output file,
350  all in this tasks self.workdir .
351 
352  gfsrealinit_bin - the name of the d01 priority storm
353  gfsrealinit binary filename
354 
355  gdasmerge_bins - a list of gdas_merge d01 binary
356  filenames for each real storm"""
357 
358  self.log().info('final_merge')
359  fprog = 'hwrf_final_merge'
360  logger=self.log()
361  prog = self.getexe('hwrf_final_merge')
362  numstorms = len(self.real_stormids)
363  gesfhr = 6
364 
365 
366  # fort.nn links used are Based on the hwrf_final_merge.f90 source file.
367  # The write is IUNIT=50+ITIM, where ITIM is the passed in gesfhr = 6. So fort.56
368  # fort.40 is the gfsinit realinit d0 binary file
369  # fort.41 ++ gdas_merge d0 binary file incremented for each storm in multistorm.
370 
371  # gesfhr is always 6
372  evars = [ gesfhr,
373  numstorms ]
374 
375  ins = { 40:gfsrealinit_bin }
376  for i, gdas_merge_bin in enumerate(gdasmerge_bins):
377  ins[i+41] = gdas_merge_bin
378 
379  # This fort.56 link is broken, until after run_ext generates self.fmprodname
380  ous = { 56:self.fmprodname+'.finalmerge.bin' }
381 
382  self.run_ext(fprog, echo=evars, inputs=ins, outputs=ous)
383 
384  def products(self,**kwargs):
385  """Iterates over all products generated by this task."""
387 
388  # Modeled from fcstask.WRFTaskBase.add_wrfinput.
389  # Currently used for just adding gfsinit/realinit/wrfinput_d01
390  # from the gfs_init.realinit object in the hwrf_expt.py
391  def add_wrfinput(self,r,stormid):
392  """Adds an input source (via self.add_input) that will provide
393  the wrfinput output file from real_nmm. The given object must
394  have a wrfinput_at_time(atime,domain) function that returns a
395  Product for a given analysis time and WRFDomain object.
396  Returns self."""
397  # TODO: think, Do I need to return , not in this case <jtf>
398  # return self.add_input('wrfinput',WRFInput2FinalMerge(r))
399  self.add_input('wrfinput',WRFInput2FinalMerge(stormid,r))
400 
401  def add_merge(self,r,stormid):
402  """Adds an input source (via self.add_input) that will provide
403  the wrfinput output file from a prior gdas_merge, real_nmm or
404  relocation. This is used for the storm's final output of the
405  per-storm initialization."""
406  return self.add_input('merge',Merge2FinalMerge(stormid,r))
407 
408  def deliver_products(self,missing=None,logger=None,keep=False,
409  frominfo=None,**kwargs):
410  """Delivers products to intercom via Product.deliver. Any
411  keyword arguments are passed on to Product.deliver. By
412  default, keep=False, which means the local copy of the file
413  may no longer exists. If frominfo is specified, it will be
414  ignored."""
415  if logger is None: logger=self.log()
416  logger.warning('Delivering products for %s'%(self.taskname,))
417  #TODO: CRITICAL Look in to Had to change dir was using self.location
418  produtil.fileop.makedirs(self.outdir,logger=logger)
419 
420 
421  # TODO: CRITICAL Hmmmm Look in to why yield no p, products in self.products()
422  # for p in self.products():
423  # loc=p.location
425  loc=p.location
426  bloc=os.path.basename(loc)
427  if os.path.exists(bloc):
428  logger.warning('%s: deliver product from ./%s'%(p.did,bloc))
429  p.deliver(frominfo=bloc,keep=keep,logger=logger,**kwargs)
430  else:
431  logger.warning('%s: ./%s does not exist. Cannot deliver.'
432  %(p.did,bloc))
433  if missing is not None:
434  missing(p,bloc)
435 
436  # This copies all the inputs needed by the final merge task from the
437  # com directory of each storm to the final_merge working diretory.
438  # copy_inputs and _make_plist_and_names is used to copy a file from
439  # self.outdir = intercom/finalmerge to a products self.workdir. copy_inputs
440  # reads the FileProduct availability flag in the datastore. Once it is
441  # available |1| it copies the from location specified in the datastore to
442  # the workdir. The products in this list "plist", are products this finalmerge
443  # task is waiting on. It is another Task outside of this task, that is generating and
444  # updating the available flag for the product. This task is just waiting for
445  # the products to become available so this task can do its work.
446  # For example, FinalMerge is waiting on and needs to copy wrfinput_d01 from
447  # <prioritySID>/intercom/gfsinit/realinit/wrfinput_d01 to <fakstormSID>/finalmerge.
448  # For right now ... that is all final merge is using it for.
449  # TODO: Change the following and implement correctly. <jtf>
450  # At the moment The finalmerge initialization sets all the products
451  # needed to run, both the finalmerge and WRF to |1|, in order for the links
452  # to be generated. Products are assumed to be there and workflow product
453  # dependencies are controlled with rocoto.
454 
455  # From relocate.RelocationTask.copy_inputs.
456  def copy_inputs(self):
457  """Copies, or makes, one or more input files."""
458  logger=self.log()
459  (plist,names,action)=self._make_plist_and_names()
460  def namer(p,logger,*args): return names[p]
461  def actor(p,name,logger,*args): action[p](p,name,logger,*args)
462 
463  # Loop over all provided products, wait for them, and copy
464  # them:
465  # TODO: Assess I was using this, but commented out after Sam's
466  # mods. This kept waiting for products realinit/wrfinput_d01
467  produtil.datastore.wait_for_products(plist,logger,namer,actor)
468 
469  # Sets all the inputs needed for the final merge task.
470  # plist is list of {FileProduct} objects that you set in the names dictionary.
471  # ie. So this will copy wrfinput_d01.location to finalmerge.working dir.
472  # From relocate.RelocationTask._make_plist_and_names.
473  def _make_plist_and_names(self):
474  """This is an internal implementation function that should not
475  be called directly. It returns a three-element tuple
476  containing a list of products, and a dict mapping from product
477  to the local filename, and a dict mapping from product to the
478  copy method. This is used to implement copy_inputs, to copy
479  input files to the local directory from remote tasks'
480  Products."""
481  def copier(p,name,logger,*args):
482  deliver_file(p.location,name,logger=logger,keep=True)
483  def linker(p,name,logger,*args):
484  make_symlink(p.location,name,force=True,logger=logger)
485 
486  # the names dictionary holds all the required input
487  # as UpstreamFile objects.
488  # The keys are the names of the files as they will be copied
489  # to in the 00L/finalmerge work diretory.
490  # ie. wrfinput_d01, wrfinput_d01_04Egdas_merge.nc, wrfinput_d01_05Egdas_merge.nc
491  names=dict()
492 
493  # Sets the priority storms gfsinit/realint wrfinput_d01.
494  # self._wrfinput is and UpstreamFile with a .location of
495  # com/../04E.2012071000.multistorm.wrfinput_d01, for example.
496  self._wrfinput_d01=self.inputs['wrfinput'][0].src.get_wrfinput_gfsinit_realinit()
497  names[self._wrfinput_d01]='wrfinput_d01'
498 
499  # Sets all the gdas_merge files for each storm.
500  # ie. com/../04E.2012071000.multistorm.wrfinput_d01_04Egdas_merge.nc
501  for merge in self.inputs['merge']:
502  mergeprod=merge.src.get_merge(merge.stormid)
503  assert(isinstance(mergeprod,produtil.datastore.Product))
504  tgtname=merge.tgtname
505  self.log().info('Product %s location %s going to %s'%(
506  mergeprod.did,mergeprod.location,tgtname))
507  names[mergeprod]=tgtname
508 
509  #names[self._prod_wrfinput_d01_finalmerge]='wrfinput_d01_00L_04Efm.nc'
510  #names[self._ghost_d02]='wrfghost_d02'
511  plist=[ k for k in names.iterkeys() ]
512  actions=dict( (n,copier) for n in names.iterkeys() )
513  return ( plist, names, actions )
514 
515  # This method is just for local development and testing in which
516  # you can't run any of the binaries.... It should never be called
517  # in production.
518  # Setup a fake merged output file
519  # Edit fmfile path as needed so it referes to where the fakefile exists.
520  # self.fmprodname is just a dummy text file.
521  def _fake_merge_file(self, sleep_secs):
522  """Create a link to a phony final merge file for initial dev testing.
523  without having to run the final merge executable"""
524 
525  logger = self.log()
526  import time
527  logger.info('fake_merge_file going to sleep for %s seconds'%sleep_secs)
528  time.sleep(sleep_secs)
529 
530  fmlinkname = os.path.join(self.workdir,self.fmprodname)
531 
532  # absolute paths
533  #fmfile = os.path.join(self.getdir('WORKhwrf'),'finalmerge.t',self.fmprodname)
534 
535  # relative paths, should test where you are.
536  fmfile = os.path.join('../finalmerge.t',self.fmprodname)
537 
538  make_symlink(fmfile,fmlinkname,force=True, logger=logger)
539 
540 
541  def run_ext(self, cmd, echo=None, inputs=None, incopies=None,
542  outputs=None, opt_outputs=None):
543  """Run an external command linking in fort.X files for input
544  and output. If redirect=True, redirect logs to a separate file.
545 
546  It will use "self.getexe()" on the command to find the
547  external program to execute.
548 
549  If a list is passed in as the echo variable, then /bin/echo
550  will be run with that list and piped to the external command.
551 
552  The inputs dictionary should consist of a fortran file
553  number and the source file, such as:
554  inputs = {11:tcvitals, 12:wrfout_d01}
555  would produce symbolic links:
556  fort.11 -> tcvitals
557  fort.12 -> wrfout_d01
558  input files can also be copied using incopies:
559  incopies = {11:tcvitals, 12:wrfout_d01}
560  would create files instead of links.
561 
562  The outputs and opt_outputs (optional outputs) should be of the
563  dictionary as the inputs. As in:
564  outputs = {56:new_data_4x, 85:storm_radius}
565  this would mean the "fort.56" file would be renamed to "new_data_4x"
566  and the "fort.85" renamed to "storm_radius".
567 
568  If opt_outputs is given then the fortran file is tested to see if it
569  exists and only if it does is it renamed to the output filename.
570 
571  A log file will be created consisting of the stdout and stderr of the
572  command run. It will be named consisting of the taskname and command.
573  For example, if this is relocation stage 1 and the command is
574  hwrf_pert_ct then the log file is "rel_stage_1_hwrf_pert_ct.log" """
575 
576  cmdname=str(cmd)
577  logger = self.log()
578  prog = self.getexe(cmdname)
579  logf = '%s/logs/%s_%s.log' %(self.dest_dir,
580  self.__class__.__name__, cmdname)
581 
582  # Build up the command
583  if echo:
584  echostr=""
585  for s in echo:
586  if isinstance(s,float): echostr+="%g "%(s,)
587  elif isinstance(s,int): echostr+="%d "%(s,)
588  else: echostr+="%s "%(str(s),)
589  logger.info(
590  'Converted %s to %s for stdin input to fortran command.'
591  %(repr(echo),repr(echostr)))
592  echostr+="\n"
593  cmd = produtil.run.openmp(produtil.run.bigexe(prog)) << echostr
594  else:
596 
597  # If redirection is requested, do so:
598  if self.redirect: cmd = cmd >= logf
599 
600  # Clean up all the fortran inputs and outputs
601  empty={}
602  if inputs is None: inputs=empty
603  if outputs is None: outputs=empty
604  if incopies is None: incopies=empty
605  iof = dict(itertools.chain(inputs.items(), outputs.items(),
606  incopies.items()))
607  for k in iof:
608  produtil.fileop.remove_file('fort.'+str(k),logger=logger)
609 
610  # Link the inputs
611  if inputs:
612  produtil.fileop.fortlink(inputs, force=True,logger=logger)
613 
614  if incopies:
615  produtil.fileop.fortcopy(incopies, force=True,
616  only_log_errors=True, logger=logger)
617 
618  # Run the command
619  logger.warning(repr(cmd)) # use logger.warning so it is in stderr
620  produtil.run.checkrun(cmd, logger=logger)
621 
622  # Rename the outputs
623  if outputs:
624  for k, v in outputs.iteritems():
625  ffile='fort.'+str(k)
626  if os.path.exists(ffile):
627  deliver_file(ffile, v, keep=False,logger=logger)
628  else:
629  logger.error('%s: did not make file %s (would mv to %s)'
630  %(cmdname,ffile,str(v)))
631 
632  # Rename the optional outputs if they exist
633  if opt_outputs:
634  for k, v in opt_outputs.iteritems():
635  ffile = 'fort.' + str(k)
636  if os.path.exists(ffile):
637  deliver_file(ffile, v, keep=False,logger=logger)
638  else:
639  logger.warning(
640  '%s: did not make file %s (would mv to %s).'
641  %(cmdname,ffile,str(v)))
642 
643  # Clean up the input links
644  for k,v in inputs.iteritems():
645  if os.path.islink('fort.'+str(k)):
646  logger.info('%s: deleting input fort file (symlink to %s)'
647  %('fort.'+str(k),v))
648  produtil.fileop.remove_file('fort.'+str(k),logger=logger)
649 
650  # Clean up the input copies
651  for k,v in incopies.iteritems():
652  if os.path.exists('fort.'+str(k)):
653  logger.info('%s: deleting input fort file (copy of %s)'
654  %('fort.'+str(k),v))
655  produtil.fileop.remove_file('fort.'+str(k),logger=logger)
656 
657 
658 
659  # TODO: Update and use this run_exe for the final merge task or get rid of it. <jtf>
660  # This run_exe is from fcsttask.py WRFTaskBase(FcstTask)
661  # Currently is not being used.
662  def run_exe(self,exename='final_merge',runner=None,sleeptime=None):
663  """Runs the executable this task is responsible for running.
664  Determines if the program ran correctly. The exename is the
665  name of the argument in the [exe] section of the
666  HWRFConfig. Options:
667 
668  runner=Runner - pass a produtil.prog.Runner object if
669  desired. This overrides any decision of what to run: the
670  exename will be ignored, and whatever is supplied in runner
671  is simply passed to produtil.run.run.
672  sleeptime - passed to produtil.run.run to determine how
673  often to check the child process. By default, the sleeptime
674  option in this task's config section is used, or if that is
675  absent, 30 seconds."""
676  if sleeptime is None:
677  sleeptime=self.conffloat('sleeptime',30)
678  logger=self.log()
679  if runner is None:
680  exe=self.getexe(exename)
681  runner=produtil.run.exe(exe)
682  stat=produtil.run.run(runner,logger=logger,sleeptime=sleeptime)
683  logger.info('%s: exit status %d'%(exename,stat))
684  if not check_last_lines('rsl.out.0000','SUCCESS COMPLETE',
685  logger=logger):
686  msg='%s: did not see SUCCESS COMPLETE in rsl.out.0000'%(exename,)
687  logger.error(msg)
688  raise RealNMMError(msg)
689  else:
690  logger.info('%s: SUCCESS COMPLETE in rsl.out.0000'%(exename,))
691 
692 
693 
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
Imitates the shell "ls -l" program.
Definition: listing.py:9
abstract base class of anything that runs or prepares input for a forecast model
Definition: fcsttask.py:363
Raised when one tries to use an invalid string for an option name.
Definition: exceptions.py:26
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
def redirect(self)
Should subprograms' outputs be redirected to separate files?
Definition: hwrftask.py:190
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
A subclass of Product that represents file delivery.
Definition: datastore.py:856
def wait_for_products
Waits for products to be available and performs an action on them.
Definition: datastore.py:979
def remove_file
Deletes the specified file.
Definition: fileop.py:251
def final_merge(self, gfsrealinit_bin, gdasmerge_bins)
def fortcopy(forts, basedir=None, logger=None, only_log_errors=False, kwargs)
A convenience function for copying files to local fort.N files for various integers N using deliver_f...
Definition: fileop.py:868
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
Definition: run.py:398
def run(arg, logger=None, sleeptime=None, kwargs)
Executes the specified program and attempts to return its exit status.
Definition: run.py:376
def openmp
Sets the number of OpenMP threads for the specified program.
Definition: run.py:415
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
Definition: hwrftask.py:396
A piece of data produced by a Task.
Definition: datastore.py:716
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
Runs the real_nmm or wrf executables.
Definition: fcsttask.py:1
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
def makedirs
Make a directory tree, working around filesystem bugs.
Definition: fileop.py:224
def fortlink
This is a convenience routine that makes many symbolic links to fort.N files for various integers N u...
Definition: fileop.py:834
def get_inputs(self, just_check=False, kwargs)
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def conffloat
Alias for self.conf.getfloat for section self.section.
Definition: hwrftask.py:274
def log
Obtain a logging domain.
Definition: hwrftask.py:425
def add_wrfinput(self, r, stormid)
inputs
a mapping of typename to a list of input objects
Definition: fcsttask.py:381
Contains the Listing class, which emulates "ls -l".
Definition: listing.py:1
def add_input(self, typename, inobj)
add input of a specified type
Definition: fcsttask.py:393
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
Definition: run.py:242
def postmsg(self, message, args, kwargs)
same as produtil.log.jlogger.info()
Definition: datastore.py:1084
def deliver_products(self, missing=None, logger=None, keep=False, frominfo=None, kwargs)
def isfakestorm(self)
Definition: hwrftask.py:113
def bigexe(name, kwargs)
Alias for exe() for backward compatibility.
Definition: run.py:254