HWRF  trunk@4391
hwrf_pull_inputs.py
1 #! /usr/bin/env python
2 
3 import os, sys, logging, getopt, re, StringIO
7 
8 from hwrf.numerics import to_datetime_rel
9 from produtil.run import exe
10 from hwrf.input import InputSource, DataCatalog
11 
12 ## @var inputscript
13 # The script from which the input list is read.
14 inputscript=None
15 
16 ## @var outputdir
17 # The output directory where data will reside
18 outputdir=None
19 
20 ## @var cycles
21 # The list of YYYYMMDDHH cycles to pull
22 cycles=None
23 
24 ## @var HOMEhwrf
25 # Parent directory of HWRF's parm/ and ush/
26 HOMEhwrf=None
27 
28 ## @var WORKhwrf
29 # Scrub area, the WORKhwrf variable in the [config] section of conf
30 WORKhwrf=None
31 
32 ## @var logger
33 # The logging.Logger object for logging messages.
34 logger=None
35 
36 ## @var conf_input_list
37 # A list of input files, relative to parm, to read in when making conf
38 conf_input_list = [
39  'hwrf_input.conf', 'hwrf.conf', 'hwrf_holdvars.conf',
40  'hwrf_basic.conf', 'system.conf' ]
41 
42 # def setup():
43 # """!Sets up the used parts of the produtil package. This does NOT
44 # call produtil.setup.setup(). Instead, it installs the signal
45 # handlers and sets up the Python logging module directly. No other
46 # parts of the produtil package are initialized."""
47 # # Install SIGTERM, SIGINT, etc. handlers. Needed to handle batch job
48 # # kills correctly.
49 # global logger
50 # produtil.sigsafety.install_handlers()
51 # global handler, logger
52 # # Set up logging. We customize things to look nice.
53 # handler=logging.StreamHandler(sys.stderr)
54 # handler.setFormatter(logging.Formatter(
55 # 'pull-inputs:%(levelname)s:%(asctime)s: %(message)s',
56 # '%Y%m%d.%H%M%S'))
57 # logger=logging.getLogger()
58 # logger.setLevel(logging.INFO)
59 # logger.addHandler(handler)
60 
61 def parse_arguments():
62  global logger, outputdir, WORKhwrf, HOMEhwrf, cycles, inputscript
63  try:
64  (optlist,args) = getopt.getopt(sys.argv[1:],'o:w:vi:')
65  for opt,val in optlist:
66  if opt=='-v':
67  logger.setLevel(logging.DEBUG)
68  logger.debug('Verbosity enabled.')
69  elif opt=='-w':
70  logger.info('Work area set to %s'%(val,))
71  WORKhwrf=val
72  elif opt=='-o':
73  logger.info('Output area set to %s'%(val,))
74  outputdir=val
75  elif opt=='-i':
76  logger.info('Input script set to %s'%(val,))
77  inputscript=val
78  else:
79  usage('Invalid option %s'%(opt,))
80  except (getopt.GetoptError,ValueError,TypeError) as e:
81  usage(str(e))
82  sys.exit(1)
83  HOMEhwrf=os.environ.get('HOMEhwrf','')
84  if not HOMEhwrf:
85  HOMEhwrf=os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
86  cwd=os.getcwd()
87  if not outputdir:
88  outputdir=os.path.join(cwd,'hwrfdata')
89  if not WORKhwrf:
90  WORKhwrf=os.path.join(cwd,'input-temp')
91  if inputscript is None:
92  if len(args)<2:
93  usage('Input script and one cycle must be specified.')
94  inputscript=args[0]
95  cycles=args[1:]
96  elif len(args)<1:
97  usage('At least one cycle must be specified.')
98  else:
99  cycles=args
100  logger.info('inputscript is %s'%(inputscript,))
101  logger.info('cycles: '+str(cycles))
102 
103 def make_conf(cycle):
105  conf.set('config','cycle',cycle)
106  conf.cycle=cycle
107  assert(conf.cycle is not None)
108  for basename in conf_input_list:
109  fullname=os.path.join(HOMEhwrf,'parm',basename)
110  if not os.path.exists(fullname):
111  logger.error('%s: does not exist. HWRF is not installed in %s'%(
112  fullname,HOMEhwrf))
113  sys.exit(1)
114  conf.read(fullname)
115  conf.set('holdvars','CASE_ROOT','HISTORY')
116  conf.set('hwrfdata','inputroot',outputdir)
117  conf.set('config','case_root','HISTORY')
118  conf.set('config','fcsthist','hist')
119  conf.set('config','realtime','false')
120  conf.guess_default_values()
121  assert(conf.cycle is not None)
122  conf.set('dir','HOMEhwrf',HOMEhwrf)
123  conf.set('dir','WORKhwrf',WORKhwrf)
124  conf.set('dir','com',WORKhwrf)
125  return conf
126 
127 def expand_lists(inputs,stack,names,lists):
128  if not names:
129  # We have finished iterating.
130  inputs.append(stack[-1])
131  return
132  # Iterate into the next list, expanding all items.
133  for item in lists[0]:
134  addme=dict(stack[-1])
135  addme[names[0]]=item
136  stack.append(addme)
137  expand_lists(inputs,stack,names[1:],lists[1:])
138 
139 def pull_command(inputs,dataset,atime,lists,args):
140  assert(isinstance(lists,dict))
141  pull=dict()
142  listflags=set()
143  ftimes=list()
144  pull.update(dataset=dataset,atime=atime,item=args[0])
145  for arg in args[1:]:
146  if arg=='optional':
147  pull['optional']=True
148  elif len(arg)>1 and arg[0]=='+':
149  if arg[1:] not in lists:
150  raise Exception('%s: list not in %s'%(arg[1:],lists))
151  listflags.add(arg[1:])
152  elif '0123456789'.find(arg[0])>=0:
153  parts=arg.split('..')
154  if len(parts)==1:
155  fhr=int(parts[0])
156  sec=fhr*3600
157  ftimes.append(to_datetime_rel(sec ,atime))
158  elif len(parts)==2:
159  start_fhr=int(parts[0])
160  end_fhr=int(parts[1])
161  for hr in xrange(start_fhr,end_fhr+1):
162  sec=hr*3600
163  ftimes.append(to_datetime_rel(sec,atime))
164  elif len(parts)>2:
165  start_fhr=int(parts[0])
166  end_fhr=int(parts[2])
167  step_fhr=int(parts[1])
168  for hr in xrange(start_fhr,end_fhr+1,step_fhr):
169  sec=hr*3600
170  ftimes.append(to_datetime_rel(sec,atime))
171  else:
172  raise Exception('Unrecognized argument %s'%(arg,))
173  if not ftimes:
174  ftimes.append(atime)
175  listnames = [ listname for listname in listflags ]
176  listlists = [ lists[listname] for listname in listnames ]
177  for ftime in ftimes:
178  addme=dict(pull)
179  addme.update(ftime=ftime)
180  stack=list()
181  stack.append(addme)
182  expand_lists(inputs,stack,listnames,listlists)
183 
184 def make_input_list(conf,inputstream):
185  dataset=None
186  cycle=conf.cycle
187  atime=cycle
188  lists=dict()
189  inputs=list()
190  for line in inputstream:
191  stripped=line.strip()
192  if stripped[0:1]=='#': continue
193  parts=stripped.split()
194  if len(parts)<1: continue
195  cmd=parts[0].upper()
196  if cmd=='DATASET' and len(parts)>1:
197  #print 'DATASET',parts[1:]
198  dataset=parts[1]
199  elif cmd=='ATIME' and len(parts)==2:
200  rel=int(parts[1])
201  atime=to_datetime_rel(rel*3600,cycle)
202  elif cmd=='ATIME' and len(parts)==1:
203  atime=cycle
204  elif ( cmd=='FILL' or cmd=='FILLINT') and len(parts)==4:
205  nums=parts[3].split('..')
206  listname=parts[1]
207  format=parts[2]
208  if len(nums)==1:
209  lists[listname]=(format%int(nums[0]))
210  elif len(nums)==2:
211  lists[listname] = \
212  [str(format)%int(num) \
213  for num in xrange(int(nums[0]),int(nums[1])+1)]
214  elif len(nums)>=3:
215  lists[listname] = \
216  [str(format)%int(num) \
217  for num in xrange(int(nums[0]),int(nums[2])+1,int(nums[1]))]
218  if cmd=='FILLINT':
219  old=lists[listname]
220  lists[listname]=[ int(O) for O in old ]
221  elif cmd=='SET' and len(parts)>2:
222  #print 'SET',parts[1:]
223  lists[parts[1]]=parts[2:]
224  elif cmd=='SETINT' and len(parts)>2:
225  #print 'SET',parts[1:]
226  lists[parts[1]]=[ int(part) for part in parts[2:] ]
227  elif cmd=='PULL' and len(parts)>1:
228  #print 'PULL',parts[1:]
229  pull_command(inputs,dataset,atime,lists,parts[1:])
230  else:
231  pass #print 'IGNORE',cmd,parts[1:]
232  return inputs
233 
234 def dump_list(inputs):
235  logger.info('List of inputs to pull:')
236  for i in inputs:
237  s=' '.join([ '%s=%s'%(k,v) for k,v in i.iteritems() ])
238  logger.info(s)
239 
240 def transfer_inputs(inputs,conf,cycle):
241  hwrfdata=DataCatalog(conf,'hwrfdata',cycle)
242  htar=exe(conf.getexe('htar'))
243  hsi=exe(conf.getexe('hsi'))
244  insec=conf.getstr('config','input_sources')
245  ins=InputSource(conf,insec,cycle,htar,logger,hsi)
246  return bool(ins.get(inputs,hwrfdata,False,logger,True))
247 
248 def main():
249  global logger
250  produtil.setup.setup(send_dbn=False,thread_logger=True)
251  logger=logging.getLogger('pull_inputs')
252  parse_arguments()
253  okay=True
254  for cycle in cycles:
255  conf=make_conf(cycle)
256  with open(inputscript,'rt') as inf:
257  inputs=make_input_list(conf,inf)
258  dump_list(inputs)
259  okay=okay and transfer_inputs(inputs,conf,cycle)
260  if not okay:
261  sys.exit(1)
262 
263 if __name__=='__main__':
264  main()
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
Contains setup(), which initializes the produtil package.
Definition: setup.py:1
Sets up signal handlers to ensure a clean exit.
Definition: sigsafety.py:1
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
def setup(ignore_hup=False, dbnalert_logger=None, jobname=None, cluster=None, send_dbn=None, thread_logger=False, thread_stack=2 **24, kwargs)
Initializes the produtil package.
Definition: setup.py:15
A replacement for the hwrf.config.HWRFConfig used throughout the HWRF system.
Definition: launcher.py:555
Obtains input data needed by various subclasses of hwrf.hwrftask.HWRFTask.
Definition: input.py:1
Time manipulation and other numerical routines.
Definition: numerics.py:1
Creates the initial HWRF directory structure, loads information into each job.
Definition: launcher.py:1