HWRF  trunk@4391
exhwrf_products.py
1 #! /usr/bin/env python
2 
3 ##@namespace scripts.exhwrf_products
4 # Runs regribbing operations on the output of exhwrf_products, and
5 # runs the GFDL vortex tracker on the regribbed output. Note that this
6 # script is restartable: if it fails, and you call it again, it will
7 # pick up where it left off. To force a reprocessing of the entire
8 # post-processing system, call exhwrf_unpost first.
9 
10 import logging, os
12 import produtil.log
13 import hwrf_expt
14 import hwrf_alerts
15 import hwrf_wcoss
16 
17 from produtil.log import jlogger
18 from produtil.cd import NamedDir
19 from produtil.run import mpi, mpirun, checkrun
20 
21 def gribber():
22  """!Runs the hwrf.gribtask.GRIBTask on one thread."""
23  jlogger.info(hwrf_expt.conf.strinterp('config',
24  '{stormlabel}: starting regribbing job for {out_prefix}'))
25  with NamedDir(hwrf_expt.WORKhwrf,logger=logging.getLogger()) as t:
26  hwrf_expt.gribber.uncomplete()
27  #hwrf_expt.gribber.unrun()
28  hwrf_expt.gribber.run()
29  jlogger.info(hwrf_expt.conf.strinterp('config',
30  '{stormlabel}: completed regribbing job for {out_prefix}'))
31 
32 def tracker(n):
33  """!Runs the hwrf.tracker.TrackerTask on one thread
34  @param n the domain of interest: 1 2 or 3"""
35  jlogger.info(hwrf_expt.conf.strinterp('config',
36  '{stormlabel}: starting domain {dom} tracker job for {out_prefix}',
37  dom=n))
38  with NamedDir(hwrf_expt.WORKhwrf,logger=logging.getLogger()) as t:
39  if n==3:
40  hwrf_expt.tracker.run()
41  elif n==2:
42  hwrf_expt.trackerd02.run()
43  elif n==1:
44  hwrf_expt.trackerd01.run()
45  jlogger.info(hwrf_expt.conf.strinterp(
46  'config','{stormlabel}: completed domain {dom} tracker job '
47  'for {out_prefix}',dom=n))
48 
49 def copier():
50  """!Runs the hwrf.copywrf.WRFCopyTask to copy native WRF input and
51  output files to COM, and then runs the gribber(). Does this on one thread."""
52  post_runs_copier=hwrf_expt.conf.getbool(
53  'config','post_runs_wrfcopier',False)
54  if not post_runs_copier:
55  jlogger.info(hwrf_expt.conf.strinterp('config',
56  '{stormlabel}: starting wrfcopier job for {out_prefix}'))
57  with NamedDir(hwrf_expt.WORKhwrf,logger=logging.getLogger()) as t:
58  hwrf_expt.wrfcopier.run(check_all=True)
59  jlogger.info(hwrf_expt.conf.strinterp('config',
60  '{stormlabel}: completed wrfcopier job for {out_prefix}'))
61  else:
62  jlogger.info('Products job will not run wrfcopier, post will do it.')
63  gribber()
64 
65 def products():
66  """!Runs the hwrf.nhc_products.NHCProducts on multiple threads."""
67  jlogger.info(hwrf_expt.conf.strinterp('config',
68  '{stormlabel}: starting nhc_products job for {out_prefix}'))
69  with NamedDir(hwrf_expt.WORKhwrf,logger=logging.getLogger()) as t:
70  hwrf_expt.nhcp.run()
71  jlogger.info(hwrf_expt.conf.strinterp('config',
72  '{stormlabel}: completed nhc_products job for {out_prefix}'))
73 
74 def wavepost():
75  wave_flag=hwrf_expt.conf.getbool('config','run_wave')
76  wave=hwrf_expt.conf.getstr('config','wave_model')
77  if not wave_flag or wave!='WW3':
78  jlogger.info(hwrf_expt.conf.strinterp('config',
79  '{stormlabel}: no wave model; skip wave_post job for {out_prefix}'))
80  return
81  jlogger.info(hwrf_expt.conf.strinterp('config',
82  '{stormlabel}: starting wave_post job for {out_prefix}'))
83  hwrf_expt.ww3post.run()
84  jlogger.info(hwrf_expt.conf.strinterp('config',
85  '{stormlabel}: completed wave_post job for {out_prefix}'))
86 
87 def starter(dryrun):
88  """!Main program for subprocesses. Calls tracker() for the last
89  one or three threads (depending on whether extra_trackers=yes in
90  the [config] section). Calls copier() for the last non-tracker
91  rank. Calls the gribber() on all other ranks."""
92  conf=hwrf_expt.conf
93  myrank=int(os.environ['SCR_COMM_RANK'])
94  count=int(os.environ['SCR_COMM_SIZE'])
95  logger=conf.log('exhwrf_products')
96  extra_trackers=conf.getbool('config','extra_trackers',False)
97  ngribbers=0
98  ncopiers=0
99  run=None
100  for rank in xrange(count):
101  if rank==0:
102  logger.info('Rank %d runs d03 tracker'%rank)
103  if rank==myrank:
104  run=lambda: tracker(3)
105  whoami='tracker'
106  elif rank==1 and extra_trackers:
107  logger.info('Rank %d runs d02 tracker'%rank)
108  if rank==myrank:
109  run=lambda: tracker(2)
110  whoami='d02tracker'
111  elif rank==2 and extra_trackers:
112  logger.info('Rank %d runs d01 tracker'%rank)
113  if rank==myrank:
114  run=lambda: tracker(1)
115  whoami='d01tracker'
116  elif rank==count-1:
117  logger.info('Rank %d runs wrfcopier'%rank)
118  ncopiers+=1
119  if rank==myrank:
120  run=lambda: copier()
121  whoami='copier%d'%ncopiers
122  else:
123  logger.info('Rank %d runs gribber'%rank)
124  ngribbers+=1
125  if rank==myrank:
126  run=lambda: gribber()
127  whoami='gribber%d'%ngribbers
128  if ncopiers<1 or ngribbers<2:
129  need=2+1+1
130  if extra_trackers: need+=2
131  msg='Cannot run products job with %d processors with these settings.'\
132  ' I require at least %d.'%(count,need)
133  logger.critical(msg)
134  sys.exit(2)
135  if dryrun:
136  return whoami
137  else:
138  run()
139 
141  """!This is run multiple times in parallel, once in each
142  subprocess. It sets up the environment and logging settings and
143  then runs the starter() function."""
144  rank=int(os.environ['SCR_COMM_RANK'])
145  count=int(os.environ['SCR_COMM_SIZE'])
146  print 'MPI communicator: rank=%d size=%d'%(rank,count)
150  subdict={ 'RANK':rank, 'COUNT':count, 'WHO':'regribber',
151  'jobid':produtil.batchsystem.jobid(),
152  'WORKhwrf':hwrf_expt.conf.getdir('WORKhwrf') }
153 
154  whoami=starter(dryrun=True)
155  subdict['THREAD_WHOAMI']=whoami
156 
157  if whoami.find('tracker')>=0:
158  # Redirect stdout and stderr to one stream for tracker job:
159  if 'TRACKER_LOGS' in os.environ:
160  r=os.environ.get('TRACKER_LOGS')
161  else:
162  r=hwrf_expt.conf.strinterp(
163  'config','%(WORKhwrf)s/%(jobid)s-%(THREAD_WHOAMI)s.log')
164  rstdout=r % dict(subdict, WHO='tracker', STREAM='out')
165  rstderr=r % dict(subdict, WHO='tracker', STREAM='err')
166  produtil.log.mpi_redirect(stdoutfile=rstdout,stderrfile=None,
167  threadname='tracker')
168  else:
169  # Regribber and copier have one file per stream (out, err).
170  if 'REGRIBBER_LOGS' in os.environ:
171  r=os.environ['REGRIBBER_LOGS']
172  else:
173  r=hwrf_expt.conf.strinterp(
174  'config',
175  '%(WORKhwrf)s/%(jobid)s-%(THREAD_WHOAMI)s.log',
176  threadwhoami=whoami)
177  rstdout=r % dict(subdict, WHO='regribber', STREAM='out')
178  rstderr=r % dict(subdict, WHO='regribber', STREAM='err')
179  logging.getLogger('hwrf').warning(
180  'Redirecting regribber %d to: stderr=%s stdout=%s'%
181  ( rank, rstderr, rstdout ))
182  produtil.log.mpi_redirect(stdoutfile=rstdout,stderrfile=rstderr,
183  threadname='regrib%d'%(rank,))
184  whoami=starter(dryrun=False)
185 
187  """!Launches an MPI program that will call this script in multiple
188  threads using the mpiserial program."""
189  # Instruct further processes not to re-launch scripts via mpirun:
190  os.environ['LAUNCH_SELF']='no'
191  # Launch four copies of myself. We must use mpiserial for this
192  # because we need the SCR_COMM_RANK variable:
193  checkrun(mpirun(mpi(hwrf_expt.conf.getexe('mpiserial','mpiserial'))
194  [os.path.realpath(__file__)],allranks=True))
195  # Calling checkrun ensures the program exits abnormally if
196  # mpirun.lsf (or whatever you use) exits abnormally.
197 
198 def doit():
199  """!Main entry point. Slave processes (launched by mpiserial)
200  just call slave_main to pass control on to tracker(), gribber() or
201  copier(). The main process (which calls mpiserial) will wait for
202  mpiserial to exit, and then run the products() function."""
204  if 'SCR_COMM_RANK' not in os.environ \
205  and os.environ.get('LAUNCH_SELF','yes')=='yes':
206  # This is the top level of the job: we are NOT inside an
207  # mpi_serial call.
208 
209  # Initialize the hwrf_expt and re-call any callbacks for completed products:
211  logger=logging.getLogger('exhwrf_products')
213  logger.info('Ensure incomplete products are marked as such...')
214  hwrf_expt.gribber.uncomplete()
215  logger.info('Add alerts and delveries...')
219  logger.warning('''Rerunning dbn_alert for prior jobs' posted files.''')
220  hwrf_expt.gribber.call_completed_callbacks()
221 
222  # We're in the top-level job. Launch copies of ourself to run the
223  # gribber and tracker:
224  logger.warning('---------------------------------------------------')
225  logger.warning('LAUNCH PARALLEL PORTION OF SCRIPT------------------')
226  logger.warning('---------------------------------------------------')
227  launchself()
228  logger.warning('---------------------------------------------------')
229  logger.warning('PARALLEL PORTION OF SCRIPT HAS ENDED---------------')
230  logger.warning('---------------------------------------------------')
231 
232  # Gribber and tracker succeeded. Run the products job:
233  if hwrf_expt.fcstlen == 126:
234  products()
235  else:
236  logger.info('Forecast length is: %d ; Not running the products job.'%hwrf_expt.fcstlen)
237 
238  # Run wave post if needed:
239  wavepost()
240  else:
241  # We're in a subprocess. Just run the gribber and tracker and return:
242  slave_main()
243 
244 #cProfile.run('doit()')
245 doit()
Change directory, handle temporary directories.
Definition: cd.py:1
Contains setup(), which initializes the produtil package.
Definition: setup.py:1
def gribber()
Runs the hwrf.gribtask.GRIBTask on one thread.
def starter(dryrun)
Main program for subprocesses.
Sets up signal handlers to ensure a clean exit.
Definition: sigsafety.py:1
def tracker(n)
Runs the hwrf.tracker.TrackerTask on one thread.
def doit()
Main entry point.
def init_module
Initializes the HWRF object structure.
Definition: hwrf_expt.py:384
def copier()
Runs the hwrf.copywrf.WRFCopyTask to copy native WRF input and output files to COM, and then runs the gribber().
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
def setup(ignore_hup=False, dbnalert_logger=None, jobname=None, cluster=None, send_dbn=None, thread_logger=False, thread_stack=2 **24, kwargs)
Initializes the produtil package.
Definition: setup.py:15
def set_vars_for_products(logger)
Sets variables to speed up the products job on WCOSS.
Definition: hwrf_wcoss.py:47
def launchself()
Launches an MPI program that will call this script in multiple threads using the mpiserial program...
This subclass of TempDir takes a directory name, instead of generating one automatically.
Definition: cd.py:228
def add_wave_alerts()
DBN ALERTS #########################################################.
Definition: hwrf_alerts.py:99
def add_regrib_alerts()
Adds dbn alerts for GRIB products by adding DBNAlert objects to the hwrf_expt.gribber.
Definition: hwrf_alerts.py:123
def jobid
Get the batch job ID.
Definition: batchsystem.py:103
def add_tracker_alerts()
Adds dbn alerts for the tracker and requests delivery of the tracker to NHC deck locations.
Definition: hwrf_alerts.py:266
def mpi_redirect
Used to split to multiple logging streams.
Definition: log.py:226
def add_nhc_alerts()
Adds dbn alerts for the hwrf_nhc_products program's output by adding DBNAlert objects to the hwrf_exp...
Definition: hwrf_alerts.py:251
def slave_main()
This is run multiple times in parallel, once in each subprocess.
Configures logging.
Definition: log.py:1
def products()
Runs the hwrf.nhc_products.NHCProducts on multiple threads.