HWRF  trunk@4391
copywrf.py
1 """!Contains the WRFCopyTask, which delivers WRF output and input.
2 
3 This module contains the implementation of WRFCopyTask, which delivers
4 WRF input and output files. It maintains three sets of deliveries:
5 
6 * initial - files created before the wrf execution begins
7 * wrfprod - files created by wrf during its execution
8 * final - files created at the end of the wrf execution, or just after wrf ends
9 
10 The WRFCopyTask.run() and WRFCopyTask.runpart() monitor the WRF
11 execution and copy files as needed.
12 
13 Examples:
14 
15 @code
16 wrf=WRFSimulation(...)
17 ... add domains to the wrf simulation ...
18 sim=ExternalWRFTask(dstore,conf,section,wrf,...more stuff...)
19 wrfcopier=WRFCopyTask(dstore,conf,"wrf_copier",sim,
20  "{vit[stormname]}{vit[stormid3]}.{YMDH}.")
21 wrfcopier.d_initial('namelist.input')
22 wrfcopier.d_initial('wrfinput_d01')
23 wrfcopier.d_initial('wrfbdy_d01')
24 wrfcopier.d_initial('fort.65')
25 fhr=0
26 while fhr-0.001<126:
27  for product in runwrf.products(time=3600*fhr,stream="history"):
28  wrfcopier.d_wrfprod(product,check=False)
29  fhr+=3
30 wrfcopier.d_final('track_d03.patcf')
31 wrfcopier.run()
32 @endcode
33 
34 This example will monitor an external WRF simulation. It will deliver
35 four files at the beginning of the simulation: namelist.input,
36 wrfinput_d01, wrfbdy_d01 and fort.65. One file will be delivered at
37 the end: track_d03.patcf. All three-hourly history stream files
38 (wrfout_d...) will be delivered as they become available. The files
39 will be delivered to com, prepending the storm name, id and cycle.
40 
41 For example, this file
42  wrfout_d01_2016-08-14_09:00:00
43 will go here:
44  /path/to/com/invest97l.2016081400.wrfout_d01_2016-08-14_09:00:00
45 due to the wrfcopier.d_wrfprod() call.
46 
47 All of the delivery happens in the last line of the code example when
48 WRFCopier.run() is called. That call will not return until all files
49 are delivered or the WRF model fails.
50 """
51 
52 __all__ = ['WRFCopyTask']
53 
54 import os, re, time
56 import hwrf.hwrftask, hwrf.wrf
57 
58 from produtil.run import checkrun, run, exe, bigexe, mpi, mpirun
59 from produtil.datastore import COMPLETED,RUNNING,UNSTARTED,Product,\
60  FileProduct,UpstreamFile
61 from hwrf.hwrftask import HWRFTask
62 
63 ##@var __all__
64 # list of symbols exported by "from hwrf.copywrf import *"
65 __all__=['WRFCopyTask']
66 
68  """!@brief wrf file delivery task
69  @details This is a Task that copies WRF input and output files
70  from the WRF run directory to the COM directory."""
71  def __init__(self,dstore,conf,section,wrftask,out_prefix,**kwargs):
72  """!@brief WRFCopyTask constructor
73  @details Constructor for the WRFCopyTask
74  @param dstore the produtil.datastore.Datastore
75  @param conf the hwrf.config.HWRFConfig
76  @param section the section to use in conf
77  @param wrftask the task that runs WRF. This should be an
78  hwrf.wrf.ExternalWRFTask, or a subclass of
79  hwrf.fcsttask.WRFTaskBase
80  @param out_prefix output file prefix, a string suitable for
81  passing into hwrf.config.HWRFConfig.strinterp()
82  @param kwargs passed to the superclass constructor"""
83  if 'outdir' not in kwargs:
84  kwargs['outdir']=conf.getdir('com')
85  super(WRFCopyTask,self).__init__(dstore,conf,section,**kwargs)
86  self._wrftask=wrftask
87  self.out_prefix=out_prefix
88  self._initial=list()
89  self._wrfprod=list()
90  self._final=list()
91  self._ncks_path=False
92  self._could_not_find_ncks=False
93 
94  ##@var out_prefix
95  # Prefix to prepend to output filenames after the com path. This
96  # string is sent through the self.confstrinterp, so it can contain
97  # {...} escape sequences.
98 
99  ##@var _wrftask
100  # The wrftask argument to __init__, the WRF simulation being monitored.
101 
102  ##@var _initial
103  # The list of WRF initial state deliveries.
104 
105  ##@var _wrfprod
106  # The list of deliveries to make during WRF execution.
107 
108  ##@var _initial
109  # The list of deliveries to make after WRF finishes.
110 
111  ##@var _ncks_path
112  # The path to ncks or the constant False if ncks is missing or unused.
113 
114  ##@var _could_not_find_ncks
115  # True if we searched for ncks, could not find it, and gave up looking.
116 
117  @property
118  def ncks_path(self):
119  """!@brief Returns the path to ncks.
120  @details Returns the path to the ncks program, used to convert
121  between NetCDF 3 and compressed NetCDF 4 file formats.
122  Returns None if ncks cannot be found. This function will only
123  search for ncks once, and will cache the result. Set
124  self._ncks_path=False to force a recheck."""
125  if self._ncks_path is False and not self._could_not_find_ncks:
126  ncks=self.getexe('ncks','')
127  if not self._ncks_path:
128  ncks=produtil.fileop.find_exe('ncks',raise_missing=False)
129  if ncks:
130  self._ncks_path=ncks
131  else:
132  self._could_not_find_ncks=True
133  return self._ncks_path
134 
135  def compression_copier(self,src,vsubset=None):
136  """!@brief creates and returns a compression_copier for deliver_file
137  @brief Returns the object that should be sent as the "copier"
138  argument to produtil.fileop.deliver_file() to copy the given
139  source file. This is either None, or a function that calls
140  ncks to compress NetCDF files. If a vsubset argument is
141  present, the file is subsetted, retaining only the variables
142  vsubset (a comma separated list).
143  @param src the source file
144  @param vsubset unused, but may one day be used to subset the file
145  @returns None if the source file is not NetCDF. If it is
146  NetCDF, then a copy(s,t,x) function is returned, suitable for
147  passing to the copy argument of
148  produtil.fileop.deliver_file()"""
149  ncks=self.ncks_path
150  if not ncks:
151  return None # don't have ncks, so cannot deliver
152 
153  if produtil.fileop.netcdfver(src) is None:
154  return None # is not NetCDF, so just copy the file bit-by-bit
155 
156  # Source file IS NetCDF, or possibly non-NetCDF HDF5, but
157  # we'll overlook that. Use ncks to compress, raise an
158  # exception if ncks returns non-zero.
159  logger=self.log()
160  def copy(s,t,x):
161  produtil.fileop.remove_file(t,logger=logger)
162  checkrun(bigexe(ncks)['-4','-L','6',s,t]<'/dev/null',
163  logger=logger)
164  return copy
165 
166  def decompression_copier(self,src):
167  """!@brief returns a decompression copier for deliver_file
168  @details Returns an object that has the reverse effect of
169  self.compression_copier. This will uncompress files that
170  compression_copier copier would compress. NetCDF files will
171  all be converted to 64-bit indexing NetCDF 3 files.
172  @param src the source file
173  @returns None if the source file is not NetCDF. If it is
174  NetCDF, then a copy(s,t,x) function is returned, suitable for
175  passing to the copy argument of produtil.fileop.deliver_file()"""
176  ncks=self.ncks_path
177  if not ncks:
178  return None # don't have ncks, so cannot deliver
179 
180  if produtil.fileop.netcdfver(src) is None:
181  return None # is not NetCDF, so just copy the file bit-by-bit
182 
183  # Source file IS NetCDF, or possibly non-NetCDF HDF5, but
184  # we'll overlook that. Use ncks to decompress, raise an
185  # exception if ncks returns non-zero.
186  logger=self.log()
187  def copy(s,t,x):
188  produtil.fileop.remove_file(t,logger=logger)
189  checkrun(bigexe(ncks)['-6',s,t]<'/dev/null',logger=logger)
190  return copy
191 
192  def comfile(self,orig,destname=None):
193  """!@brief get the full path to a file
194  @details Generates a full path to the delivery location of the
195  specified source file. Returns the full path and the basename
196  in a tuple.
197  @returns a tuple (path,basename) where the path is the full
198  path to the file, and the basename is os.path.basename(path)
199  @param orig the original filename
200  @param destname optional: the desired destination name format
201  which will be sent through hwrf.hwrftask.HWRFTask.confstrinterp()
202  to generate the final destination filename"""
203  if(isinstance(orig,Product)):
204  bn=os.path.basename(str(orig.location))
205  else:
206  bn=os.path.basename(str(orig))
207  # The bn_colon is the same as bn, but uses : to separate time
208  # components, - to separate date components and an _ between
209  # the date and time. This matches the syntax expected by most
210  # programs external to this workflow.
211  bn_colon=re.sub(
212  '([0-9][0-9][0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])'
213  '[_.-]([0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])[^/]*$',
214  r'\1-\2-\3_\4:\5:\6',bn)
215  bn_colon_s00=re.sub(
216  '([0-9][0-9][0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])'
217  '[_.-]([0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])[^/]*$',
218  r'\1-\2-\3_\4:\5:00',bn)
219  if destname is None:
220  fullbn='%s.%s'%(self.out_prefix,bn_colon)
221  else:
222  fullbn=self.confstrinterp(destname,inname=bn,
223  inname_colon=bn_colon,
224  inname_colon_s00=bn_colon_s00)
225  return ( os.path.join(self.outdir,fullbn), fullbn )
226 
227  def d_initial(self,inprod,check=None,destname=None):
228  """!@brief deliver a file generated before WRF starts
229  @details Requests delivery of a file that is created before the
230  wrf.exe invocation. The "inprod" may be a Product or a
231  filename relative to the WRF run directory. The optional
232  "check" argument enables calling the potentially expensive
233  "check" subroutine on the upstream product every time it is
234  considered for delivery. If the input is a Product, and
235  check=False, then only the "available" subroutine is called,
236  which will not be updated unless another Task marks the
237  product as available. The default is check=False for Products
238  and check=True for filenames.
239  @param inprod the input produtil.datastore.Product
240  @param check do we need to call the product's check()
241  function? This is needed if it is a produtil.datastore.UpstreamFile
242  @param destname optional: the destname argument to comfile()
243  which will be used to generate the delivered filename in
244  the COM directory."""
245  return self._deliver_to_group(self._initial,inprod,check,destname)
246 
247  def d_final(self,inprod,check=None,destname=None):
248  """!@brief deliver a file generated at the end of the WRF run
249  @details Requests delivery of a file created by WRF that is not
250  complete until the WRF exits. Examples of this are the
251  wrfdiag, hifreq and patcf files. These files will be
252  delivered when the underlying WRF Task has a state of
253  COMPLETED. The optional "check" argument enables calling the
254  potentially expensive "check" subroutine on the upstream
255  product every time it is considered for delivery. If the
256  input is a Product, and check=False, then only the "available"
257  subroutine is called, which will not be updated unless another
258  Task marks the product as available. The default is
259  check=False for Products and check=True for filenames.
260  @param inprod the input produtil.datastore.Product
261  @param check do we need to call the product's check()
262  function? This is needed if it is a produtil.datastore.UpstreamFile
263  @param destname optional: the destname argument to comfile()
264  which will be used to generate the delivered filename in
265  the COM directory."""
266  return self._deliver_to_group(self._final,inprod,check,destname)
267 
268  def _deliver_to_group(self,group,inprod,check=None,destname=None):
269  """!@brief internal function that arranges for future file delivery
270  @details Do not call this function directly. It is the internal
271  implementation of d_initial and d_final. Call those functions
272  instead.
273  @param group which group does this belong to? Should be
274  self._initial, self._wrfprod or self._final
275  @param inprod the input produtil.datastore.Product
276  @param check do we need to call the product's check()
277  function? This is needed if it is a produtil.datastore.UpstreamFile
278  @param destname optional: the destname argument to comfile()
279  which will be used to generate the delivered filename in
280  the COM directory."""
281  (comfile,combn)=self.comfile(inprod,destname=destname)
282  if(isinstance(inprod,Product)):
283  upstream=inprod
284  if check is None: check=False
285  else:
286  # Make an internal UpstreamFile to check for the file:
287  wrffile=os.path.join(self._wrftask.location,inprod)
288  upstream=UpstreamFile(dstore=self.dstore,prodname=combn,
289  category="%s-upstream"%(self.taskname,),location=wrffile)
290  upstream.location=wrffile
291  # Nobody else can see this Product so we must check it:
292  check=True
293  product=FileProduct(
294  dstore=self.dstore,prodname=combn,category=self.taskname,
295  location=comfile)
296  group.append( (upstream,product,bool(check)) )
297  return self
298  def d_wrfprod(self,product,check=False,destname=None):
299  """!@brief deliver a file generated during the WRF simulation
300  such as a history or restart file
301  @details Requests delivery of a WRF I/O subsystem output file.
302  The "product" argument must be a Product object. The optional
303  argument "check" enables calling the potentially expensive
304  "check" subroutine on the product every time it is considered
305  for delivery. If check=False, then only the "available"
306  subroutine is called, which will not be updated unless another
307  Task marks the product as available.
308  @param product the produtil.datastore.Product
309  @param check do we need to call the product's check()
310  function? This is needed if it is a produtil.datastore.UpstreamFile
311  @param destname optional: the destname argument to comfile()
312  which will be used to generate the delivered filename in
313  the COM directory."""
314  if not isinstance(check,bool):
315  raise TypeError('In d_wrfprod, check must be a bool, not a %s %s'%(
316  type(check).__name__,repr(check)))
317  (comfile,combn)=self.comfile(product,destname=destname)
318  outproduct=FileProduct(dstore=self.dstore,prodname=combn,
319  category=self.taskname,location=comfile)
320  outproduct.location=comfile
321  self._wrfprod.append( (product,outproduct,bool(check)) )
322  return self
323 
324  def run(self,check_all=False):
325  """!@brief watch for files to show up, delivering them when they do
326  @details Keeps watching for WRF files to show up, copying them when
327  they do. This is just a simple wrapper around self.runpart,
328  and does not return until runpart sets the state to something
329  other than RUNNING.
330  @param check_all if True, all non-delivered products have
331  product.check() called on them"""
332  self.state=RUNNING
333  logger=self.log()
334  while self.state==RUNNING:
335  if self.run_helper(False,check_all):
336  logger.info('Sleep 5...')
337  time.sleep(5)
338  logger.info(' ...done sleeping.')
339 
340  def deliver_group(self,group,check_all=False):
341  """!deliver files to COM
342 
343  @protected
344  This is an internal implementation function. Do not call it
345  directly. Takes a list of tuples containing an upstream
346  product, a downstream product to deliver, and a boolean
347  telling whether to check() the upstream product. Delivers
348  available products. Returns a tuple containing two booleans:
349  the first is True iff something was delivered, and the second
350  is true iff something is left in the group that has not been
351  delivered yet. The check_all argument can be used to force a
352  check on all products by setting check_all=True.
353 
354  @param group either self._initial, self._wrfprod or self._final
355  @param check_all if True, run product.check() on all products"""
356  did_something=False
357  more_to_do=False
358  logger=self.log()
359  lockdir=os.path.join(self.getdir('lockdir'),self.taskname)
360  for inprod,outprod,check in group:
361  logger.debug(
362  'COPYWRF ITEM: inprod=%s outprod=%s check=%s check_all=%s'%(
363  repr(inprod),repr(outprod),repr(check),repr(check_all)))
364  messagemore=''
365  if not check and not check_all:
366  messagemore=' or post has not posted it yet'
367  try:
368  if not outprod.available:
369  available=inprod.available
370  if not available and ( check or check_all ):
371  inprod.check()
372  available=inprod.available
373  if available:
374  logger.info('%s: delivering.'%(
375  str(outprod.location),))
376  lockfile=os.path.join(
377  lockdir,os.path.basename(inprod.location))
379  filename=lockfile,max_tries=1)
380  try:
381  with locker:
382  ifrom=inprod.location
383  copier=self.compression_copier(ifrom)
384  outprod.deliver(frominfo=ifrom,copier=copier)
385  did_something=True
386  except produtil.locking.LockHeld as lh:
387  logger.info(
388  ' Nope. Another process is delivering this '
389  'file right now. Moving on.')
390  more_to_do=True
391  else:
392  logger.info('%s: not yet available%s.'
393  %(str(inprod.location),messagemore))
394  more_to_do=True
395  except Exception as e:
396  more_to_do=True
397  logger.warning('%s: trouble delivering: %s\n'%(
398  repr(outprod.location),str(e)),exc_info=True)
399  return ( did_something, more_to_do )
400  def unrun(self):
401  """!@brief delete delivered files
402  @details Calls the undeliver function on all products,
403  deleting them from the destination. Product objects' undeliver()
404  functions are called to achieve this. """
405  for inprod,outprod,check in self._initial:
406  outprod.undeliver()
407  for inprod,outprod,check in self._wrfprod:
408  outprod.undeliver()
409  def runpart(self,check_all=False):
410  """!@brief deliver one output file and return.
411  @details Delivers one output file and returns. Sets the state to
412  COMPLETED if all files are delivered.
413  @param check_all Optional. If True, forces a call to check()
414  on all undelivered products, even if those products are not
415  checked by default."""
416  self.run_helper(True,check_all)
417  def run_helper(self,runpart,check_all=False):
418  """!@brief internal implementation of run() and runpart()
419  @details This is the internal implementation of run and
420  runpart. It delivers files, and returns False if all files
421  are delivered.
422  @param runpart If runpart=True, run_helper() will return immediately
423  after delivering one file.
424  @param check_all Optional. If True, forces a call to check()
425  on all undelivered products, even if those products are not
426  checked by default."""
427  self._wrftask.update_state()
428  state=self._wrftask.state
429  started = (state==RUNNING or state==COMPLETED)
430  completed = ( state == COMPLETED )
431 
432  initial_complete=False # done copying wrf inputs
433  parallel_complete=False # done copying wrf outputs
434  logger=self.log()
435  logger.info('wrf task state=%d so started=%s and completed=%s'
436  %(self._wrftask.state,repr(started),repr(completed)))
437  more_init=False
438  more_para=False
439  more_final=False
440  if started:
441  (did_something,more_init)=self.deliver_group(
442  self._initial,check_all)
443  if did_something and runpart:
444  # This is runpart, and we just delivered some initial
445  # products, so return.
446  return True
447  # Now deliver any parallel products:
448  (did_something,more_para)=self.deliver_group(self._wrfprod,check_all)
449  if did_something and runpart:
450  return True
451  # And the "final state" products:
452  if completed:
453  (did_something,more_final)=self.deliver_group(
454  self._final,check_all)
455  if did_something and runpart:
456  # This is runpart, and we just delivered some initial
457  # products, so return.
458  return True
459  if not more_init and not more_para and not more_final:
460  # Nothing remains to be delivered, so we're done.
461  self.state=COMPLETED
462  logger.info('nothing left to deliver')
463  return False
464  else:
465  return True
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
Create namelists, monitor wrf simulations, generate filenames.
Definition: wrf.py:1
def netcdfver(filename)
What is the NetCDF version of this file?
Definition: fileop.py:177
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
Definition: hwrftask.py:319
This exception is raised when a LockFile cannot lock a file because another process or thread has loc...
Definition: locking.py:55
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
Handles file locking using Python "with" blocks.
Definition: locking.py:1
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
def remove_file
Deletes the specified file.
Definition: fileop.py:251
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
def d_initial
deliver a file generated before WRF starts
Definition: copywrf.py:227
def d_final
deliver a file generated at the end of the WRF run
Definition: copywrf.py:247
def d_wrfprod
deliver a file generated during the WRF simulation such as a history or restart file ...
Definition: copywrf.py:298
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
def decompression_copier(self, src)
returns a decompression copier for deliver_file
Definition: copywrf.py:166
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
Definition: hwrftask.py:396
def _deliver_to_group
internal function that arranges for future file delivery
Definition: copywrf.py:268
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
out_prefix
Prefix to prepend to output filenames after the com path.
Definition: copywrf.py:87
def run_helper
internal implementation of run() and runpart()
Definition: copywrf.py:417
def runpart
deliver one output file and return.
Definition: copywrf.py:409
Stores products and tasks in an sqlite3 database file.
Definition: datastore.py:1
def comfile
get the full path to a file
Definition: copywrf.py:192
_wrfprod
The list of deliveries to make during WRF execution.
Definition: copywrf.py:89
_wrftask
The wrftask argument to init, the WRF simulation being monitored.
Definition: copywrf.py:86
_could_not_find_ncks
True if we searched for ncks, could not find it, and gave up looking.
Definition: copywrf.py:92
def log
Obtain a logging domain.
Definition: hwrftask.py:425
def run
watch for files to show up, delivering them when they do
Definition: copywrf.py:324
def __init__(self, dstore, conf, section, wrftask, out_prefix, kwargs)
WRFCopyTask constructor.
Definition: copywrf.py:71
def deliver_group
deliver files to COM
Definition: copywrf.py:340
def unrun(self)
delete delivered files
Definition: copywrf.py:400
wrf file delivery task
Definition: copywrf.py:67
Automates locking of a lockfile.
Definition: locking.py:66
_initial
The list of WRF initial state deliveries.
Definition: copywrf.py:88
def ncks_path(self)
Returns the path to ncks.
Definition: copywrf.py:118
def compression_copier
creates and returns a compression_copier for deliver_file
Definition: copywrf.py:135
def find_exe
Searches the $PATH or a specified iterable of directory names to find an executable file with the giv...
Definition: fileop.py:573
_ncks_path
The path to ncks or the constant False if ncks is missing or unused.
Definition: copywrf.py:91