HWRF  trunk@4391
gsipost.py
1 """!Post-processes GSI input and output for data assimilation diagnostics."""
2 
3 ##@var __all__
4 # The list of symbols to export via "from hwrf.gsipost import *"
5 __all__ = ["GSIPost",'F_ORG','F_GES','F_ANL','F_ALL']
6 
7 import os, glob, shutil
10 
11 from produtil.run import run, checkrun, alias, mpi, mpirun, exe, bigexe
12 from produtil.cd import NamedDir
13 from produtil.fileop import deliver_file, make_symlink, remove_file
14 from hwrf.post import EGRIB1Product, check_post, link_post_fix
15 
16 ##@var F_ORG
17 # Flag to indiciate the original, downscaled parent model data is desired
18 F_ORG=1
19 
20 ##@var F_GES
21 # Flag to indicate the vortex-relocated data (first guess) is desired
22 F_GES=2
23 
24 ##@var F_ANL
25 # Flag to indiciate the final WRF input (after GSI and merge) is desired
26 F_ANL=4
27 
28 ##@var F_ALL
29 # Flag to indicate all initialization steps are desired
30 F_ALL=F_ORG|F_GES|F_ANL
31 
33  """!Post-processes GSI inputs and outputs.
34 
35  Runs the hwrf_post on GSI inputs and outputs from one or more of three stages.
36  * original data (F_ORG) - the original, downscaled, parent model data
37  * first guess (F_GES) - the original data, with the vortex relocated
38  * analysis (F_ANL) - the final data input to the forecast
39  This class is compatible with the hwrf.regrib and hwrf.gribtask,
40  and can be used in place of an hwrf.post.PostManyWRF.
41  """
42  def __init__(self,dstore,conf,section,**kwargs):
43  """!Constructor for GSIPost
44  @param dstore the produtil.datastore.Datastore database to use
45  @param conf the hwrf.config.HWRFConfig configuration information
46  @param section the section to use in conf
47  @param kwargs keyword arguments passed to the superclass constructor"""
48  super(GSIPost,self).__init__(dstore,conf,section,**kwargs)
49  self._domains=list()
50  self._wrfout=dict()
51 
52  self._wrforg=dict()
53  self._wrfges=dict()
54  self._wrfanl=dict()
55  self._org_products=dict()
56  self._ges_products=dict()
57  self._anl_products=dict()
58 
59  def add_case(self,domain,wrfout,org=None,ges=None,anl=None):
60  """!Adds another case to the list of those to post-process
61 
62  @param domain a WRFDomain to be processed
63  @param wrfout a wrfout file for internal use. It will be copied
64  and its fields will e replaced with the ges and/or analysis
65  file as needed
66  @param org Optional: original down-scaled parent domain data
67  @param ges Optional: a first guess wrfinput or restart file, generally after
68  vortex relocation
69  @param anl Optional: an analysis (GSI output) wrfinput or restart file
70  @note At least one of org, ges or anl must be given."""
71  assert(not (org is None and ges is None and anl is None))
72  assert(wrfout is not None)
73  assert(domain is not None)
74 
75  self._domains.append(domain)
76  #self._domsize[domain]=[int(mdeglat),int(mdeglon),int(mdegres)]
77  self._wrfout[domain]=wrfout
78 
79  if org is not None:
80  self._wrforg[domain]=org
81  self._org_products[domain]=EGRIB1Product(
82  self.dstore,category=self.taskname,
83  prodname=domain.name+'_org_'+org.prodname)
84 
85  if ges is not None:
86  self._wrfges[domain]=ges
87  self._ges_products[domain]=EGRIB1Product(
88  self.dstore,category=self.taskname,
89  prodname=domain.name+'_ges_'+ges.prodname)
90 
91  if anl is not None:
92  self._wrfanl[domain]=anl
93  self._anl_products[domain]=EGRIB1Product(
94  self.dstore,category=self.taskname,
95  prodname=domain.name+'_anl_'+anl.prodname)
96 
97  def run(self):
98  """!Executes the GSI post for all cases on all domains."""
99  logger=self.log()
100  good=True
101  for domain in self._domains:
102  logger.info('%s: process this domain'%(str(domain),))
103  wrfout=self._wrfout[domain]
104  if domain in self._org_products:
105  logger.info('%s: process org'%(str(domain),))
106  self._process(self._wrforg[domain],
107  domain,'org',wrfout,self._org_products[domain])
108  else:
109  logger.info('%s: has no org data'%(str(domain),))
110 
111  if domain in self._ges_products:
112  logger.info('%s: process ges'%(str(domain),))
113  self._process(self._wrfges[domain],
114  domain,'ges',wrfout,self._ges_products[domain])
115  else:
116  logger.info('%s: has no ges data'%(str(domain),))
117 
118  if domain in self._anl_products:
119  logger.info('%s: process anl'%(str(domain),))
120  self._process(self._wrfanl[domain],
121  domain,'anl',wrfout,self._anl_products[domain])
122  else:
123  logger.info('%s: has no anl data'%(str(domain),))
124  logger.info('Done processing domains.')
125 
126  def _process(self,inprod,domain,why,wrfout,outprod):
127  """!Internal function that implements most of run()
128 
129  Do not call directly. This is the implementation of
130  self.run: it runs the post for one case, on one domain."""
131  assert(inprod is not None)
132  assert(outprod is not None)
133  assert(wrfout is not None)
134  assert(domain is not None)
135  logger=self.log()
136  #assert(outprod.location) # should already be set in constructor
137  assert(wrfout.location)
138  wrfthere=wrfout.location
139  assert(wrfthere)
140  if not produtil.fileop.isnonempty(wrfthere):
142  '%s: is empty or nonexistent'%(wrfthere,))
143  if not inprod.location:
144  logger.info('%s: not available (location unknown)'%(
145  inprod.did,))
146  elif not inprod.available:
147  logger.info('%s (%s): not available according to database'%(
148  inprod.did, inprod.location))
149  shortname=domain.name+'_'+why+'_'+inprod.prodname
150  workdir=os.path.join(self.workdir,shortname)
151  if os.path.exists(workdir):
152  logger.info('%s: exists; will delete'%(workdir,))
153  shutil.rmtree(workdir)
154  with NamedDir(workdir,keep=not self.scrub,logger=logger):
155  diffwrf=alias(bigexe(self.getexe('hwrf_3dvar')))
156  post=alias(mpi(self.getexe('post')))
157 
158  # Copy the fields into the wrfout file:
159  deliver_file(wrfthere,'postinput',keep=True)
160  make_symlink(inprod.location,'ghost',force=True)
161  cmd=diffwrf['storm_relocate','ghost','flnm3','new_ght']
162  checkrun(cmd >= 'storm_relocate.log',logger=logger)
163  cmd=diffwrf['3dvar_update','postinput','new_ght']
164  checkrun(cmd >= '3dvar_update.log',logger=logger)
165 
166  # Delete any stray fort.* files:
167  for filename in glob.glob('fort.*'):
168  produtil.fileop.remove_file(filename,logger=logger)
169 
170  # Run the post:
171  datestamp=self.conf.cycle.strftime('%Y-%m-%d_%H:%M:%S')
172  with open('itag','wt') as itag:
173  itag.write("""postinput
174 netcdf
175 %s
176 NMM NEST
177 """ % (datestamp,))
178  cmd=mpirun(post,allranks=True) >= 'vpost.log'
179  needcrtm=self.confbool('needcrtm',False)
180  logger.info('link post fix files')
181  link_post_fix(self.getdir('FIXhwrf'),needcrtm,logger=logger)
182  fort14=self.confstr('control')
183  logger.info('%s: use this control file for gsi post'%(fort14,))
184  produtil.fileop.make_symlink(fort14,'fort.14',force=True,
185  logger=logger)
186  logger.info('Run post, log to vpost.log.')
187  ret=run(cmd,logger=logger)
188 
189  # Check to see if the post succeeded and get the center
190  # lat & lon of the domain for post-processing:
191  (ok,cenla,cenlo,filename)=check_post(ret,shortname,logger)
192  if not ok:
193  raise hwrf.exceptions.PostFailed('GSI post on '+shortname)
194  #toloc=outprod.location
195  toloc=os.path.join(self.outdir,shortname)
196  outprod.deliver(toloc,{'CENLA':cenla,'CENLO':cenlo,
197  'fromloc':filename},logger=logger)
198  return True
199 
200  def products(self,domains=None,domain=None,which_step=F_ALL,**kwargs):
201  """!Iterates over EGRIB1Product objects produced by this task.
202 
203  @param domains a list of WRFDomain objects. Only these will be iterated.
204  @param domain a single WRFDomain; only its products will be iterated
205  @param which_step which steps are of interest:
206  * F_ORG - original parent model data
207  * F_GES - first guess (vortex-relocated parent model data)
208  * F_ANL - analysis, the final input to the forecast
209  * any integer or of the above
210  * F_ALL - all products, the default
211  @param kwargs ignored"""
212  if domains is None and domain is not None:
213  domains=[domain]
214  elif domains is None:
215  domains=self._domains
216  for d in domains:
217  if which_step&F_ORG and d in self._org_products:
218  yield self._org_products[d]
219  if which_step&F_ANL and d in self._anl_products:
220  yield self._anl_products[d]
221  if which_step&F_GES and d in self._ges_products:
222  yield self._ges_products[d]
223 
224 
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def add_case
Adds another case to the list of those to post-process.
Definition: gsipost.py:59
def getexe
Alias for hwrf.config.HWRFConfig.get() for the "exe" section.
Definition: hwrftask.py:403
taskname
Read-only property: the name of this task.
Definition: datastore.py:1134
The base class of tasks run by the HWRF system.
Definition: hwrftask.py:25
Post-processes GSI inputs and outputs.
Definition: gsipost.py:32
Raised upon errors that would cause a retry, in the PostOneWRF.run when passed the raiseall=True argu...
Definition: exceptions.py:416
def remove_file
Deletes the specified file.
Definition: fileop.py:251
dstore
Read-only property, an alias for getdatastore(), the Datastore in which this Datum resides...
Definition: datastore.py:557
def confbool
Alias for self.conf.getbool for section self.section.
Definition: hwrftask.py:287
Base class of tasks run by HWRF.
Definition: hwrftask.py:1
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
def getdir
Alias for hwrf.config.HWRFConfig.get() for the "dir" section.
Definition: hwrftask.py:396
outdir
The directory in which this task should deliver its final output.
Definition: hwrftask.py:176
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Definition: fileop.py:333
def products(self, domains=None, domain=None, which_step=F_ALL, kwargs)
Iterates over EGRIB1Product objects produced by this task.
Definition: gsipost.py:200
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
Runs the GSI data assimilation on the HWRF system.
Definition: gsi.py:1
def __init__(self, dstore, conf, section, kwargs)
Constructor for GSIPost.
Definition: gsipost.py:42
workdir
The directory in which this task should be run.
Definition: hwrftask.py:156
def scrub(self)
Should temporary files be deleted as soon as they are not needed?
Definition: hwrftask.py:195
def log
Obtain a logging domain.
Definition: hwrftask.py:425
def run(self)
Executes the GSI post for all cases on all domains.
Definition: gsipost.py:97
Represents an E grid WRF-NMM GRIB1 file, and stores two metadata values: CENLA and CENLO which contai...
Definition: post.py:154
Raised when the post's input file is not available and raiseall=True in PostOneWRF.run.
Definition: exceptions.py:420
Runs the Unified Post Processor on outputs from the WRF-NMM, producing E grid GRIB files as EGRIB1Pro...
Definition: post.py:1
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def confstr
Alias for self.conf.getstr for section self.section.
Definition: hwrftask.py:261
def _process(self, inprod, domain, why, wrfout, outprod)
Internal function that implements most of run()
Definition: gsipost.py:126
def make_symlink
Creates a symbolic link "target" that points to "source".
Definition: fileop.py:677