1 """!Contains the WRFCopyTask, which delivers WRF output and input.
3 This module contains the implementation of WRFCopyTask, which delivers
4 WRF input and output files. It maintains three sets of deliveries:
6 * initial - files created before the wrf execution begins
7 * wrfprod - files created by wrf during its execution
8 * final - files created at the end of the wrf execution, or just after wrf ends
10 The WRFCopyTask.run() and WRFCopyTask.runpart() monitor the WRF
11 execution and copy files as needed.
16 wrf=WRFSimulation(...)
17 ... add domains to the wrf simulation ...
18 sim=ExternalWRFTask(dstore,conf,section,wrf,...more stuff...)
19 wrfcopier=WRFCopyTask(dstore,conf,"wrf_copier",sim,
20 "{vit[stormname]}{vit[stormid3]}.{YMDH}.")
21 wrfcopier.d_initial('namelist.input')
22 wrfcopier.d_initial('wrfinput_d01')
23 wrfcopier.d_initial('wrfbdy_d01')
24 wrfcopier.d_initial('fort.65')
27 for product in runwrf.products(time=3600*fhr,stream="history"):
28 wrfcopier.d_wrfprod(product,check=False)
30 wrfcopier.d_final('track_d03.patcf')
34 This example will monitor an external WRF simulation. It will deliver
35 four files at the beginning of the simulation: namelist.input,
36 wrfinput_d01, wrfbdy_d01 and fort.65. One file will be delivered at
37 the end: track_d03.patcf. All three-hourly history stream files
38 (wrfout_d...) will be delivered as they become available. The files
39 will be delivered to com, prepending the storm name, id and cycle.
41 For example, this file
42 wrfout_d01_2016-08-14_09:00:00
44 /path/to/com/invest97l.2016081400.wrfout_d01_2016-08-14_09:00:00
45 due to the wrfcopier.d_wrfprod() call.
47 All of the delivery happens in the last line of the code example when
48 WRFCopier.run() is called. That call will not return until all files
49 are delivered or the WRF model fails.
52 __all__ = [
'WRFCopyTask']
58 from produtil.run import checkrun, run, exe, bigexe, mpi, mpirun
60 FileProduct,UpstreamFile
65 __all__=[
'WRFCopyTask']
68 """!@brief wrf file delivery task
69 @details This is a Task that copies WRF input and output files
70 from the WRF run directory to the COM directory."""
71 def __init__(self,dstore,conf,section,wrftask,out_prefix,**kwargs):
72 """!@brief WRFCopyTask constructor
73 @details Constructor for the WRFCopyTask
74 @param dstore the produtil.datastore.Datastore
75 @param conf the hwrf.config.HWRFConfig
76 @param section the section to use in conf
77 @param wrftask the task that runs WRF. This should be an
78 hwrf.wrf.ExternalWRFTask, or a subclass of
79 hwrf.fcsttask.WRFTaskBase
80 @param out_prefix output file prefix, a string suitable for
81 passing into hwrf.config.HWRFConfig.strinterp()
82 @param kwargs passed to the superclass constructor"""
83 if 'outdir' not in kwargs:
84 kwargs[
'outdir']=conf.getdir(
'com')
85 super(WRFCopyTask,self).
__init__(dstore,conf,section,**kwargs)
119 """!@brief Returns the path to ncks.
120 @details Returns the path to the ncks program, used to convert
121 between NetCDF 3 and compressed NetCDF 4 file formats.
122 Returns None if ncks cannot be found. This function will only
123 search for ncks once, and will cache the result. Set
124 self._ncks_path=False to force a recheck."""
126 ncks=self.
getexe(
'ncks',
'')
136 """!@brief creates and returns a compression_copier for deliver_file
137 @brief Returns the object that should be sent as the "copier"
138 argument to produtil.fileop.deliver_file() to copy the given
139 source file. This is either None, or a function that calls
140 ncks to compress NetCDF files. If a vsubset argument is
141 present, the file is subsetted, retaining only the variables
142 vsubset (a comma separated list).
143 @param src the source file
144 @param vsubset unused, but may one day be used to subset the file
145 @returns None if the source file is not NetCDF. If it is
146 NetCDF, then a copy(s,t,x) function is returned, suitable for
147 passing to the copy argument of
148 produtil.fileop.deliver_file()"""
162 checkrun(bigexe(ncks)[
'-4',
'-L',
'6',s,t]<
'/dev/null',
167 """!@brief returns a decompression copier for deliver_file
168 @details Returns an object that has the reverse effect of
169 self.compression_copier. This will uncompress files that
170 compression_copier copier would compress. NetCDF files will
171 all be converted to 64-bit indexing NetCDF 3 files.
172 @param src the source file
173 @returns None if the source file is not NetCDF. If it is
174 NetCDF, then a copy(s,t,x) function is returned, suitable for
175 passing to the copy argument of produtil.fileop.deliver_file()"""
189 checkrun(bigexe(ncks)[
'-6',s,t]<
'/dev/null',logger=logger)
193 """!@brief get the full path to a file
194 @details Generates a full path to the delivery location of the
195 specified source file. Returns the full path and the basename
197 @returns a tuple (path,basename) where the path is the full
198 path to the file, and the basename is os.path.basename(path)
199 @param orig the original filename
200 @param destname optional: the desired destination name format
201 which will be sent through hwrf.hwrftask.HWRFTask.confstrinterp()
202 to generate the final destination filename"""
203 if(isinstance(orig,Product)):
204 bn=os.path.basename(str(orig.location))
206 bn=os.path.basename(str(orig))
212 '([0-9][0-9][0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])'
213 '[_.-]([0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])[^/]*$',
214 r'\1-\2-\3_\4:\5:\6',bn)
216 '([0-9][0-9][0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])'
217 '[_.-]([0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])[^/]*$',
218 r'\1-\2-\3_\4:\5:00',bn)
223 inname_colon=bn_colon,
224 inname_colon_s00=bn_colon_s00)
225 return ( os.path.join(self.
outdir,fullbn), fullbn )
228 """!@brief deliver a file generated before WRF starts
229 @details Requests delivery of a file that is created before the
230 wrf.exe invocation. The "inprod" may be a Product or a
231 filename relative to the WRF run directory. The optional
232 "check" argument enables calling the potentially expensive
233 "check" subroutine on the upstream product every time it is
234 considered for delivery. If the input is a Product, and
235 check=False, then only the "available" subroutine is called,
236 which will not be updated unless another Task marks the
237 product as available. The default is check=False for Products
238 and check=True for filenames.
239 @param inprod the input produtil.datastore.Product
240 @param check do we need to call the product's check()
241 function? This is needed if it is a produtil.datastore.UpstreamFile
242 @param destname optional: the destname argument to comfile()
243 which will be used to generate the delivered filename in
244 the COM directory."""
247 def d_final(self,inprod,check=None,destname=None):
248 """!@brief deliver a file generated at the end of the WRF run
249 @details Requests delivery of a file created by WRF that is not
250 complete until the WRF exits. Examples of this are the
251 wrfdiag, hifreq and patcf files. These files will be
252 delivered when the underlying WRF Task has a state of
253 COMPLETED. The optional "check" argument enables calling the
254 potentially expensive "check" subroutine on the upstream
255 product every time it is considered for delivery. If the
256 input is a Product, and check=False, then only the "available"
257 subroutine is called, which will not be updated unless another
258 Task marks the product as available. The default is
259 check=False for Products and check=True for filenames.
260 @param inprod the input produtil.datastore.Product
261 @param check do we need to call the product's check()
262 function? This is needed if it is a produtil.datastore.UpstreamFile
263 @param destname optional: the destname argument to comfile()
264 which will be used to generate the delivered filename in
265 the COM directory."""
268 def _deliver_to_group(self,group,inprod,check=None,destname=None):
269 """!@brief internal function that arranges for future file delivery
270 @details Do not call this function directly. It is the internal
271 implementation of d_initial and d_final. Call those functions
273 @param group which group does this belong to? Should be
274 self._initial, self._wrfprod or self._final
275 @param inprod the input produtil.datastore.Product
276 @param check do we need to call the product's check()
277 function? This is needed if it is a produtil.datastore.UpstreamFile
278 @param destname optional: the destname argument to comfile()
279 which will be used to generate the delivered filename in
280 the COM directory."""
281 (comfile,combn)=self.
comfile(inprod,destname=destname)
282 if(isinstance(inprod,Product)):
284 if check
is None: check=
False
287 wrffile=os.path.join(self._wrftask.location,inprod)
288 upstream=UpstreamFile(dstore=self.
dstore,prodname=combn,
289 category=
"%s-upstream"%(self.
taskname,),location=wrffile)
290 upstream.location=wrffile
296 group.append( (upstream,product,bool(check)) )
299 """!@brief deliver a file generated during the WRF simulation
300 such as a history or restart file
301 @details Requests delivery of a WRF I/O subsystem output file.
302 The "product" argument must be a Product object. The optional
303 argument "check" enables calling the potentially expensive
304 "check" subroutine on the product every time it is considered
305 for delivery. If check=False, then only the "available"
306 subroutine is called, which will not be updated unless another
307 Task marks the product as available.
308 @param product the produtil.datastore.Product
309 @param check do we need to call the product's check()
310 function? This is needed if it is a produtil.datastore.UpstreamFile
311 @param destname optional: the destname argument to comfile()
312 which will be used to generate the delivered filename in
313 the COM directory."""
314 if not isinstance(check,bool):
315 raise TypeError(
'In d_wrfprod, check must be a bool, not a %s %s'%(
316 type(check).__name__,repr(check)))
317 (comfile,combn)=self.
comfile(product,destname=destname)
318 outproduct=FileProduct(dstore=self.
dstore,prodname=combn,
319 category=self.
taskname,location=comfile)
320 outproduct.location=comfile
321 self._wrfprod.append( (product,outproduct,bool(check)) )
324 def run(self,check_all=False):
325 """!@brief watch for files to show up, delivering them when they do
326 @details Keeps watching for WRF files to show up, copying them when
327 they do. This is just a simple wrapper around self.runpart,
328 and does not return until runpart sets the state to something
330 @param check_all if True, all non-delivered products have
331 product.check() called on them"""
334 while self.
state==RUNNING:
336 logger.info(
'Sleep 5...')
338 logger.info(
' ...done sleeping.')
341 """!deliver files to COM
344 This is an internal implementation function. Do not call it
345 directly. Takes a list of tuples containing an upstream
346 product, a downstream product to deliver, and a boolean
347 telling whether to check() the upstream product. Delivers
348 available products. Returns a tuple containing two booleans:
349 the first is True iff something was delivered, and the second
350 is true iff something is left in the group that has not been
351 delivered yet. The check_all argument can be used to force a
352 check on all products by setting check_all=True.
354 @param group either self._initial, self._wrfprod or self._final
355 @param check_all if True, run product.check() on all products"""
360 for inprod,outprod,check
in group:
362 'COPYWRF ITEM: inprod=%s outprod=%s check=%s check_all=%s'%(
363 repr(inprod),repr(outprod),repr(check),repr(check_all)))
365 if not check
and not check_all:
366 messagemore=
' or post has not posted it yet'
368 if not outprod.available:
369 available=inprod.available
370 if not available
and ( check
or check_all ):
372 available=inprod.available
374 logger.info(
'%s: delivering.'%(
375 str(outprod.location),))
376 lockfile=os.path.join(
377 lockdir,os.path.basename(inprod.location))
379 filename=lockfile,max_tries=1)
382 ifrom=inprod.location
384 outprod.deliver(frominfo=ifrom,copier=copier)
388 ' Nope. Another process is delivering this '
389 'file right now. Moving on.')
392 logger.info(
'%s: not yet available%s.'
393 %(str(inprod.location),messagemore))
395 except Exception
as e:
397 logger.warning(
'%s: trouble delivering: %s\n'%(
398 repr(outprod.location),str(e)),exc_info=
True)
399 return ( did_something, more_to_do )
401 """!@brief delete delivered files
402 @details Calls the undeliver function on all products,
403 deleting them from the destination. Product objects' undeliver()
404 functions are called to achieve this. """
405 for inprod,outprod,check
in self.
_initial:
407 for inprod,outprod,check
in self.
_wrfprod:
410 """!@brief deliver one output file and return.
411 @details Delivers one output file and returns. Sets the state to
412 COMPLETED if all files are delivered.
413 @param check_all Optional. If True, forces a call to check()
414 on all undelivered products, even if those products are not
415 checked by default."""
418 """!@brief internal implementation of run() and runpart()
419 @details This is the internal implementation of run and
420 runpart. It delivers files, and returns False if all files
422 @param runpart If runpart=True, run_helper() will return immediately
423 after delivering one file.
424 @param check_all Optional. If True, forces a call to check()
425 on all undelivered products, even if those products are not
426 checked by default."""
427 self._wrftask.update_state()
428 state=self._wrftask.state
429 started = (state==RUNNING
or state==COMPLETED)
430 completed = ( state == COMPLETED )
432 initial_complete=
False
433 parallel_complete=
False
435 logger.info(
'wrf task state=%d so started=%s and completed=%s'
436 %(self._wrftask.state,repr(started),repr(completed)))
443 if did_something
and runpart:
449 if did_something
and runpart:
455 if did_something
and runpart:
459 if not more_init
and not more_para
and not more_final:
462 logger.info(
'nothing left to deliver')
This module provides a set of utility functions to do filesystem operations.
Create namelists, monitor wrf simulations, generate filenames.
def netcdfver(filename)
What is the NetCDF version of this file?
def confstrinterp(self, string, section=None, kwargs)
Alias for self.icstr for backward compatibility.
This exception is raised when a LockFile cannot lock a file because another process or thread has loc...
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Handles file locking using Python "with" blocks.
taskname
Read-only property: the name of this task.
The base class of tasks run by the HWRF system.
def remove_file
Deletes the specified file.
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
def d_initial
deliver a file generated before WRF starts
def d_final
deliver a file generated at the end of the WRF run
def d_wrfprod
deliver a file generated during the WRF simulation such as a history or restart file ...
A shell-like syntax for running serial, MPI and OpenMP programs.
Base class of tasks run by HWRF.
def decompression_copier(self, src)
returns a decompression copier for deliver_file
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
def _deliver_to_group
internal function that arranges for future file delivery
outdir
The directory in which this task should deliver its final output.
out_prefix
Prefix to prepend to output filenames after the com path.
def run_helper
internal implementation of run() and runpart()
def runpart
deliver one output file and return.
Stores products and tasks in an sqlite3 database file.
def comfile
get the full path to a file
_wrfprod
The list of deliveries to make during WRF execution.
_wrftask
The wrftask argument to init, the WRF simulation being monitored.
_could_not_find_ncks
True if we searched for ncks, could not find it, and gave up looking.
def log
Obtain a logging domain.
def run
watch for files to show up, delivering them when they do
def __init__(self, dstore, conf, section, wrftask, out_prefix, kwargs)
WRFCopyTask constructor.
def deliver_group
deliver files to COM
def unrun(self)
delete delivered files
Automates locking of a lockfile.
_initial
The list of WRF initial state deliveries.
def ncks_path(self)
Returns the path to ncks.
def compression_copier
creates and returns a compression_copier for deliver_file
def find_exe
Searches the $PATH or a specified iterable of directory names to find an executable file with the giv...
_ncks_path
The path to ncks or the constant False if ncks is missing or unused.