HWRF  trunk@4391
input.py
1 
2 """!Obtains input data needed by various subclasses of
3 hwrf.hwrftask.HWRFTask.
4 
5 This module implements the functionality described in
6 hwrf.hwrftask.HWRFTask.inputiter(). It takes many HWRF tasks, asks
7 them what input is needed and collates that information. It has a
8 list of many possible input sources, and knows which ones are
9 available from which cluster. It goes through available input sources
10 in priority order, obtaining the input data."""
11 
12 ##@var __all__
13 # Symbols exported by "from hwrf.input import *"
14 __all__=["DataCatalog","InputSource",'in_date_range']
15 
16 import collections, os, ftplib, tempfile, ConfigParser, urlparse, stat, \
17  re, threading, time, datetime, StringIO
21 
22 from produtil.run import alias, batchexe, checkrun, ExitStatusException, run
23 from produtil.fileop import deliver_file, isnonempty, make_symlink, makedirs
24 from hwrf.numerics import to_datetime, to_datetime_rel, to_timedelta
25 from hwrf.exceptions import InputSourceBadType,PartialTransfer,\
26  UnsupportedTransfer
27 
28 ########################################################################
29 def in_date_range(t,trange):
30  """!Is this time in the given time range?
31 
32  @param t A time as a ten digit number. For example, 1830123118 is
33  December 31, 1830 at 18:00 UTC.
34  @param trange A comma-separated list of time ranges such as
35  this:
36  * 2015081412 --- 12:00 UTC on August 14, 2015
37  * 2015081412-2015082318 --- From 12:00 UTC on August 14, 2015
38  through 18:00 UTC on August 23, 2015
39  * 2015081412-2015082318,2011010100-2011123123 --- From 12:00 UTC
40  on August 14, 2015 through 18:00 UTC on August 23, 2015 and all
41  of year 2011.
42  @returns True if t falls in the range trange, or False otherwise."""
43  epsilon=to_timedelta('1800') # epsilon = one half hour
44  t=to_datetime(t)
45  for tr in trange.split(','):
46  idash=tr.find('-')
47  if idash<0:
48  # single date
49  start=to_datetime(tr)
50  if t>=to_datetime_rel(-epsilon,start) \
51  and t<=to_datetime_rel(epsilon,start):
52  return True
53  else:
54  # date range
55  start=to_datetime(tr[0:10])
56  end=to_datetime(tr[idash+1:idash+11])
57  if t>=to_datetime_rel(-epsilon,start) \
58  and t<=to_datetime_rel(epsilon,end):
59  return True
60  return False
61 
62 
63 ########################################################################
64 def tempopen(f,m):
65  """!Convenience function that opens a temporary file using
66  tempfile.NamedTemporaryFile."""
67  produtil.fileop.makedirs(os.path.dirname(f))
68  return tempfile.NamedTemporaryFile(prefix=os.path.basename(f),
69  dir=os.path.dirname(f),
70  mode=m,suffix='.tmp',delete=False)
71 
72 ########################################################################
73 
74 def strsrc(d):
75  """!Makes a string version of a dataset+item dict as produced by
76  hwrf_expt.inputiter() or hwrf.hwrftask.HWRFTask.inputiter()"""
77  s=StringIO.StringIO()
78  s.write("%s(%s"%(d.get("dataset","(**no*dataset**)"),
79  d.get("item","(**no*item**)")))
80  for k in sorted(list(d.iterkeys())):
81  if k=='dataset' or k=='item': continue
82  v=d[k]
83  if isinstance(v,datetime.datetime):
84  s.write(', %s=%s'%(str(k),v.strftime('%Y-%m-%d_%H:%M:%S')))
85  else:
86  s.write(', %s=%s'%(str(k),repr(v)))
87  s.write(')')
88  return s.getvalue()
89 
90 ########################################################################
91 
92 def strsrc(d):
93  """!Makes a string version of a dataset+item dict as produced by
94  hwrf_expt.inputiter() or hwrf.hwrftask.HWRFTask.inputiter()"""
95  s=StringIO.StringIO()
96  s.write("%s(%s"%(d.get("dataset","(**no*dataset**)"),
97  d.get("item","(**no*item**)")))
98  for k in sorted(list(d.iterkeys())):
99  if k=='dataset' or k=='item': continue
100  v=d[k]
101  if isinstance(v,datetime.datetime):
102  s.write(', %s=%s'%(str(k),v.strftime('%Y-%m-%d_%H:%M:%S')))
103  else:
104  s.write(', %s=%s'%(str(k),repr(v)))
105  s.write(')')
106  return s.getvalue()
107 
108 ########################################################################
109 class DataCatalog(object):
110  """!Provides the location of a file in an archive, on disk or on a
111  remote server via sftp or ftp.
112 
113  This class is a collection of functions that know how to provide
114  the location of a file in either an archive or a filesystem. It
115  does not know how to actually obtain the file. This serves as the
116  underlying "where is that file" implementation of InputSource.
117  All of this is driven by a section in an hwrf.config.HWRFConfig
118  object.
119 
120  For example, suppose one set up this configuration file:
121  @code{.conf}
122  [wcoss_fcst_nco]
123  # WCOSS: Input locations for the production HWRF
124  gfs = /com/gfs/prod/gfs.{aYMD}/
125  gdas1 = /com/gfs/prod/gdas.{aYMD}/
126  gfs_sf = gfs.t{aHH}z.sf{fahr:02d}
127  gfs_sfcanl = gfs.t{aHH}z.sfcanl
128  gdas1_bufr = gdas1.t{aHH}z.{obstype}.tm00.bufr_d
129  @endcode
130 
131  In this example, "gfs" is a dataset, while "gfs_sfcanl" is an item
132  in the dataset. The DataCatalog.locate() function can find the
133  location of a gfs_sf file given the inputs required for string
134  expansion by hwrf.config.HWRFConfig.timestrinterp(). In this
135  case, only the analysis time is required for the "{aYMD}" in the
136  dataset location and "{aHH}" in the gfs_sfcanl filename.
137  @code{.py}
138  dc=DataCatalog(conf,"wcoss_fcst_nco","2015091800")
139  sfcanl=dc.locate("gfs","gfs_sfcanl")
140  print sfcanl
141  @endcode
142  That code would print "/com/gfs/prod/gfs.20150818/gfs.t00z.sfcanl"
143  which is the operational output path of the GFS surface analysis
144  file for the analysis time in question.
145 
146  Suppose we wanted the spectral forecast file, "gfs_sf" instead,
147  for forecast hour 54. That also requires the forecast time
148  ("ftime") in order to fill in the "{fahr:02d}" in the filename
149  with the number 54.
150  @code{.py}
151  dc=DataCatalog(conf,"wcoss_fcst_nco","2015091800")
152  sf48a=dc.locate("gfs","gfs_sf",ftime="2015092006")
153  sf48b=dc.locate("gfs","gfs_sf",ftime=48*3600)
154  print sf48a
155  print sf48b
156  @endcode
157  That code would print "/com/gfs/prod/gfs.20150818/gfs.t00z.sf54"
158  twice. Note that you can specify the forecast time as an absolute
159  time, or as a number of seconds relative to the analysis time and
160  achieve the same effect either way.
161 
162  If we want the bufr file, we have to provide one more piece of
163  information: the observation type, to fill in "{obstype}".
164  @code{.py}
165  dc=DataCatalog(conf,"wcoss_fcst_nco","2015091800")
166  gpm=dc.locate("gdas1","gdas1_bufr",obstype="gpm")
167  print gpm
168  @endcode
169  which prints "/com/gfs/prod/gdas.20150918/gdas1.t00z.gpm.tm00.bufr_d"
170  """
171  def __init__(self,conf,section,anltime):
172  """!DataCatalog constructor
173  @param conf the configuration object, an hwrf.config.HWRFConfig
174  @param section the section that provides location information
175  @param anltime the default analysis time """
176  self.conf=conf
177  if not isinstance(section,basestring):
178  raise TypeError('In DataCatalog.__init__, section must be a '
179  'string.')
180  self.section=section
181  self.anltime=to_datetime(anltime)
182  ##@var section
183  # The section used for dataset and item locations in conf.
184 
185  ##@var conf
186  # The configuration object, an hwrf.config.HWRFConfig or subclass.
187 
188  ##@var anltime
189  # The default analysis time for parse() and locate() if none is
190  # specified.
191 
192  def __repr__(self):
193  """!A string representation of this DataCatalog"""
194  if isinstance(self.anltime,datetime.datetime):
195  stime=self.anltime.strftime('%Y%m%d%H')
196  else:
197  stime=str(self.anltime)
198  return "DataCatalog(conf,%s,%s)"%(repr(self.section), stime)
199  def rt_updated(self):
200  """!Is this dataset updated in real-time?
201 
202  @returns True if this dataset is updated in real-time, False
203  otherwise. By default, this will return True if
204  conf[section,"rt_updated"] is set to "yes" or False otherwise."""
205  try:
206  return conf.getbool(section,'rt_updated',False)
207  except ( ConfigParser.Error,KeyError,TypeError,ValueError ) as e:
208  return False
209  def parse(self,string,atime=None,ftime=None,logger=None,dates=None,
210  **kwargs):
211  """!Internal function that performs string interpolation.
212 
213  This is an internal implementation function that you should
214  not call directly. It performs string interpolation using the
215  underlying conf object. This acts exactly like the expansions
216  done in the hwrf.conf file: {stuff} is expanded to the
217  contents of the "stuff" variable. Expansions are done in the
218  section specified in the constructor. In addition, various a*
219  and f* variables are expanded based on the analysis time
220  ("atime") and forecast time ("ftime"). See
221  hwrf.config.HWRFConfig.timestrinterp() for details.
222  @param string the string being expanded
223  @param atime Optional: the analysis time. The default is self.anltime
224  @param ftime Optional: the forecast time.
225  @param logger Optional: a logging.Logger for log messages
226  @param dates Optional: dates for which this datasource is valid.
227  This is passed to in_date_range() for validation. This is
228  used to implement the InputSource date ranges.
229  @param kwargs Additional keyword arguments are passed to the
230  hwrf.config.HWRFConfig.timestrinterp() for string replacement.
231  @returns The return value from string interpolation or None if
232  nothing was found."""
233  if atime is None:
234  if logger is not None:
235  logger.info(
236  '{%s}: has no atime. Will use atime=self.anltime=%s.'%(
237  str(string),repr(atime)))
238  atime=self.anltime
239  if ftime is None:
240  if logger is not None:
241  logger.info('{%s}: has no ftime. Will use ftime=atime=%s.'%(
242  str(string),repr(atime)))
243  ftime=atime
244  atime=to_datetime(atime)
245  ftime=to_datetime_rel(ftime,atime)
246  if dates is not None and atime is not None:
247  if not in_date_range(atime,dates):
248  if logger is not None:
249  logger.info('{%s}: atime %s not in %s'%(
250  str(string),str(atime),str(dates)))
251  return None
252  if logger is not None:
253  logger.info(
254  'parsing {%s} with ftime=%s atime=%s in section %s'
255  %(str(string),repr(ftime),repr(atime),repr(self.section)))
256  return self.conf.timestrinterp(
257  self.section,"{"+string+"}",ftime,atime,**kwargs)
258  def locate(self,dataset,item,atime=None,ftime=None,logger=None,
259  dates=None,**kwargs):
260  """!Find the location of a requested piece of data.
261 
262  Locates the specified item for the specified dataset, at the
263  given analysis time ("atime") and forecast time ("ftime"). If
264  the requested data is known to not exist, returns None. This
265  should be overridden by subclasses. The present
266  implementation just does this: {dataset}/{item} expanding
267  dataset and item with self.parse. Any kwargs are passed
268  along: this allows such things as ensemble ID, or switching
269  between GRIB1 or GRIB2 via a keyword argument.
270  @param dataset The name of the dataset.
271  @param item The name of the item in the dataset.
272  @param atime Optional: the analysis time. The default is self.anltime.
273  @param ftime Optional: the forecast time which can be anything
274  accepted by hwrf.numerics.to_datetime_rel() relative to the
275  analysis time.
276  @param logger Optional: a logging.Logger for log messages. If this
277  is provided, several steps along the way of finding the data
278  location are logged.
279  @param dates Optional: dates for which this datasource is valid.
280  This is passed to in_date_range() for validation. This is
281  used to implement the InputSource date ranges.
282  @param kwargs Additional keyword arguments are passed by
283  parse() to the hwrf.config.HWRFConfig.timestrinterp() for
284  string replacement.
285  @return The path to the requested data or None if it is not found."""
286  if logger is not None:
287  logger.info(
288  'locate item=%s atime=%s ftime=%s in dataset=%s'
289  %(repr(item),repr(atime),repr(ftime),repr(dataset)))
290  ds=self.parse(dataset,atime=atime,ftime=ftime,logger=logger,
291  dates=dates,**kwargs)
292  if ds is None: return None
293  it=self.parse(item,atime=atime,ftime=ftime,logger=logger,**kwargs)
294  result=ds+it
295  if logger is not None:
296  logger.info( 'result %s %s => %s'%(
297  repr(ds),repr(it),repr(result),))
298  return result
299 
300 ########################################################################
301 class InputSource(object):
302  """!Fetch data from multiple sources.
303 
304  This class knows how to fetch data from remote clusters, or the
305  local machine. The data locations are specified by a several
306  DataCatalog sections, each of which is given a priority, a valid
307  set of dates and a file transfer mechanism. Data catalogs are
308  tried in priority order. Files are obtained in multiple threads
309  at once, and several file transfer mechanisms are understood:
310 
311  * file:// --- obtain files on disk
312  * ftp:// --- contact an FTP server
313  * sftp:// --- contact a server over SSH. SSH-based rsync is used.
314  * htar:// --- use the proprietary htar program to get a tape archive
315 
316  However, only one DataCatalog is examined at a time. All threads
317  work on that one DataCatalog until all data that can be obtained
318  from it is done. Then the threads exit, and new ones are spawned
319  to examine the next DataCatalog.
320 
321  For example, suppose you are on the Jet supercomputer running a
322  HISTORY (retrospective) simulation. You set up this configuration
323  section in your hwrf.conf config file:
324  @code{.conf}
325  [jet_sources_prod2014]
326  jet_hist_PROD2014%location = file:///
327  jet_hist_PROD2014%histprio=90
328  jet_hist_PROD2014%fcstprio=90
329 
330  prod15_data_sp%location=htar://
331  prod15_data_sp%histprio=59
332  prod15_data_sp%dates=2015011218-2015123118
333 
334  [jet_hist_PROD2014]
335  @inc=gfs2014_naming
336  inputroot2014=/lfs3/projects/hwrf-data/hwrf-input
337  gfs={inputroot2014}/HISTORY/GFS.{aYYYY}/{aYMDH}/
338  gfs_sfcanl = gfs.t{aHH}z.sfcanl
339 
340  [prod15_data_sp]
341  inputroot=/NCEPPROD/2year/hpssprod/runhistory/rh{aYYYY}/{aYYYY}{aMM}/{aYMD}
342  gfs={inputroot}/
343  gfs_sfcanl = {gfs_tar}#./gfs.t{aHH}z.sfcanl
344 
345  [hwrfdata]
346  inputroot=/pan2/projects/hwrfv3/John.Doe/hwrfdata
347  gfs={inputroot}/hwrf.{aYMDH}/
348  gfs_sfcanl = gfs.t{aHH}z.sfcanl
349  @endcode
350  and this is the code:
351  @code{.py}
352  is=InputSource(conf,"jet_sources_prod2014","2015071806")
353  hwrfdata=DataCatalog(conf,"hwrfdata")
354  is.get([
355  {"dataset":"gfs", "item":"gfs_sfcanl","atime"="2015071800"},
356  {"dataset":"gfs", "item":"gfs_sfcanl","atime"="2015071806"},
357  {"dataset":"gfs", "item":"gfs_sfcanl","atime"="2015071812"} ],
358  hwrfdata,realtime=False)
359  @endcode
360 
361  In this example, the InputSource will look for three GFS surface
362  analysis files. It will search two possible locations for them:
363  the on-disk Jet "PROD2014" history location and the NCO production
364  tape files. The disk location will be searched first because its
365  history priority is 90, while the tape area has a priority of 59.
366 
367  Three files will show up eventually:
368 
369  * /pan2/projects/hwrfv3/John.Doe/hwrfdata/hwrf.2015071800/gfs.t00z.sfcanl
370  * /pan2/projects/hwrfv3/John.Doe/hwrfdata/hwrf.2015071806/gfs.t06z.sfcanl
371  * /pan2/projects/hwrfv3/John.Doe/hwrfdata/hwrf.2015071812/gfs.t12z.sfcanl
372 
373  Each file will come from either here:
374 
375  * /lfs3/projects/hwrf-data/hwrf-input/HISTORY/GFS.2015071800/gfs.t00z.sfcanl
376  * /lfs3/projects/hwrf-data/hwrf-input/HISTORY/GFS.2015071806/gfs.t06z.sfcanl
377  * /lfs3/projects/hwrf-data/hwrf-input/HISTORY/GFS.2015071812/gfs.t12z.sfcanl
378 
379  or here:
380 
381  * htar -xf /NCEPPROD/2year/hpssprod/runhistory/rh2015/201507/20150718/2015071800gfs.tar ./gfs.t00z.sfcanl
382  * htar -xf /NCEPPROD/2year/hpssprod/runhistory/rh2015/201507/20150718/2015071806gfs.tar ./gfs.t06z.sfcanl
383  * htar -xf /NCEPPROD/2year/hpssprod/runhistory/rh2015/201507/20150718/2015071812gfs.tar ./gfs.t12z.sfcanl """
384  def __init__(self,conf,section,anltime,htar=None,logger=None,hsi=None):
385  """!InputSource constructor.
386  @param conf the hwrf.config.HWRFConfig to use for
387  configuration info
388  @param section the section that specifies the list of data catalogs
389  @param anltime the default analysis time
390  @param htar the produtil.prog.Runner that runs htar
391  @param logger a logging.Logger for log messages
392  @param hsi the produtil.prog.Runner that runs hsi"""
393  self.conf=conf
394  self.section=section
395  self.anltime=anltime
396  def none():
397  return None
398  def dictnone():
399  return collections.defaultdict(none)
400  self._sftp_dir_ok=collections.defaultdict(dictnone)
401  self._logger=logger
402  self.forecast=list() # FORECAST mode DataCatalogs
403  self._f_sorted=True
404  self.history=list() # HISTORY mode DataCatalogs
405  self._h_sorted=True
406  self.locks=collections.defaultdict(threading.Lock)
407  assert(htar is not None)
408  assert(hsi is not None)
409  self.htar=alias(htar)
410  self.hsi=alias(hsi)
411  self.valid=collections.defaultdict(None)
412 
413  sections=[section]
414  if conf.has_option(section,'@inc'):
415  sections.extend(conf[section,'@inc'].split(','))
416 
417  sources=collections.defaultdict(dict)
418  for sec in sections:
419  for key in conf.keys(sec):
420  c=key.find('%')
421  if(c>0):
422  (src,attr)=(key[0:c],key[c+1:])
423  try:
424  sources[src][attr]=conf.get(sec,key)
425  except KeyError as ke:
426  if logger is not None:
427  logger.warning("[%s] %s: key error: %s"%(
428  sec,key,str(ke)))
429  continue
430  bad=list()
431  for (src,attr) in sources.iteritems():
432  if 'location' in attr and ('histprio' in attr or \
433  'fcstprio' in attr):
434  dctype=attr.get('type','DataCatalog')
435  if dctype=='DataCatalog':
436  dc=DataCatalog(self.conf,src,self.anltime)
437  else:
438  raise InputSourceBadType(
439  'Do not know how to make a DataCatalog of type "%s"'
440  %(dctype,))
441  if 'dates' in attr:
442  dates=attr['dates']
443  else:
444  dates='1970010100-2038011818'
445  self.add(dc,location=attr['location'],
446  fcstprio=attr.get('fcstprio',None),
447  histprio=attr.get('histprio',None),
448  dates=dates)
449  else:
450  logger.warning('Bad source %s: must have location and either histprio or fcstprio.'%(src,))
451  bad.append(str(src))
452  if bad:
454  'Input sources must ahve location and either histprio or '
455  'fcstprio. Check options in [%s]: %s and rerun launcher '
456  'job.'%(self.section,', '.join(bad)))
457  self._sort()
458  ##@var conf
459  # The hwrf.config.HWRFConfig object used for configuration info
460 
461  ##@var section
462  # The section in conf that contains the data catalog list and relevant info
463 
464  ##@var anltime
465  # The default analysis time.
466 
467  ##@var forecast
468  # List of forecast mode DataCatalog objects.
469 
470  ##@var history
471  # List of history mode DataCatalog objects.
472 
473  ##@var locks
474  # Lock objects to restrict access to FTP servers to one thread at a time.
475 
476  ##@var htar
477  # A produtil.prog.ImmutableRunner that runs htar.
478 
479  ##@var hsi
480  # A produtil.prog.ImmutableRunner that runs hsi.
481 
482  ##@var valid
483  # Data source validitiy information.
484 
485  def _rsync_ssh_exe(self,netpart,path=None,checkdir='/',dest=None):
486  """!Creates a produtil.prog.Runner for running rsync over ssh.
487 
488  Returns a Runner object (as in produtil.run) for executing
489  rsync -e ssh. This subroutine is used to implement
490  workarounds for known bugs.
491  @param netpart The netpart portion of the sftp URL.
492  @param path The path portion of the sftp URL.
493  @param dest The destination on the local disk."""
494  rsync=self.conf.getexe('rsync','rsync')
495  if 'jet' in netpart or produtil.cluster.name()=='jet':
496  # Workaround for Jet bug: use protocol 29
497  cmd=alias(batchexe(rsync)['-e','ssh','--protocol','29'])
498  else:
499  cmd=alias(batchexe(rsync)['-e','ssh'])
500  if path and dest:
501  cmd=cmd['-LvptgoD',"%s:%s"%(netpart,path),dest]
502  else:
503  # Don't transfer a file. Just check access.
504  cmd=cmd['-d','%s:%s'%(netpart,checkdir)]
505  return cmd
506  def _sort(self):
507  """!Sorts the list of history and forecast DataCatalogs by
508  decreasing priority."""
509  self.forecast=sorted(self.forecast,key=lambda x: -x[0])
510  self.history=sorted(self.history,key=lambda x: -x[0])
511  def add(self,dc,location,fcstprio=None,histprio=None,dates=None):
512  """!Adds a DataCatalog to this InputSource.
513 
514  Called automatically from the constructor to add a DataCatalog
515  to this InputSource. The list of add() calls is generated
516  from the config section specified in the constructor. You
517  should never need to call this function unless you want to
518  explicitly add more DataCatalog objects that are not listed in
519  the config files.
520 
521  The location parameter is a URL from file, sftp, ftp or htar.
522  Examples:
523 
524  * local files: file:///lfs3/projects/hwrf-data/hwrf-input/
525  * scp: sftp://Some.Username@dtn-zeus.rdhpcs.noaa.gov/
526  * ftp: ftp://anonymous@ftpprd.ncep.noaa.gov/
527  * htar: htar:///NCEPPROD/1year/hpssprod/runhistory/rh2012/201204/20120418/
528 
529  @warning Bad things will happen if you add the same source
530  twice. Bad things.
531  @note If fcstprio and histprio are both None, this call has no
532  effect.
533 
534  @param dc the DataCatelog object
535  @param location the URL of the data source, including the
536  username if needed.
537  @param fcstprio the priority for using this source in FORECAST
538  (real-time) mode. If missing or None, the source will not
539  be used in FORECAST mode.
540  @param histprio the priority for using this source in HISTORY
541  (retrospective) mode. If missing or None,the source will
542  not be used in HISTORY mode.
543 
544  @param dates Dates for which this source is valid. This is
545  passed to the trange argument of in_date_range(t,trange) """
546  if fcstprio is None and histprio is None: return
547  if dates is None:
548  dates='1970010100-2038011818'
549  parsed=urlparse.urlparse(location)
550  if fcstprio is not None:
551  self.forecast.append( ( float(fcstprio), location, parsed, dc, dates ) )
552  self._f_sorted=False
553  if histprio is not None:
554  self.history.append( ( float(histprio), location, parsed, dc, dates ) )
555  self._h_sorted=False
556  def open_ftp(self,netpart,logger=None,timeout=20):
557  """!Opens an FTP connection
558 
559  Opens the specified ftp://user@host/... request subject to the
560  specified timeout, logging to the specified logger (if present
561  and non-Null).
562  @param netpart The netpart portion of the URL
563  @param logger the logging.Logger for log messages
564  @param timeout the connection timeout in seconds"""
565  if logger is None: logger=self._logger
566  if logger is not None:
567  logger.info('open_ftp %s'%(netpart,))
568  r=re.search('([a-zA-Z0-9_.-]+)+@(.+)',netpart)
569  if r:
570  (user,host)=r.groups()
571  if not user or not host:
572  raise InvalidLogin(
573  'FTP logins must be of the form user@host but you '
574  'gave "%s"'%(netpart))
575  else:
576  (user,host)=('anonymous',netpart)
577  f=None
578  try:
579  if logger is not None: logger.info('%s@%s: log in'%(user,host))
580  f=ftplib.FTP(host,user,timeout=timeout)
581  f.login()
582  assert(f is not None)
583  retval=f
584  f=None
585  valid['ftp://'+netpart]=True
586  return retval
587  except Exception as e:
588  valid['ftp://'+netpart]=False
589  finally:
590  if f is not None:
591  if logger is not None:
592  logger.warning('In finally block, closing FTP stream.')
593  f.close()
594  def rsync_check_access(self,netpart,logger=None,timeout=20,dirpath='/'):
595  """!Checks to see if rsync can even access a remote server.
596  @param netpart the netpart portion of the URL
597  @param logger the logging.Logger for log messages
598  @param timeout the connection timeout in seconds
599  @returns True if the server is accessible and False otherwise"""
600  try:
601  cmd=self._rsync_ssh_exe(netpart,checkdir=dirpath)
602  checkrun(cmd,logger=logger)
603  return True
604  except Exception as e:
605  if logger is not None:
606  logger.warning('%s: rsync cannot access: %s'
607  %(str(netpart),str(e)))
608  return False
609 
610  def fetch_file(self,streams,dc,dsurl,urlmore,dest,logger=None,
611  timeout=20,realtime=True):
612  """!Internal implementation function that fetches one file.
613 
614  You should not call this directly; it is meant to be called
615  by "get" and re-implemented in subclasses. This grabs one
616  file, potentially from a remote location. The URL for the
617  base directory of some dataset is in dsurl, while the specific
618  file is in urlmore. The urlmore will be appended to the file
619  part of dsurl via urljoin, and the resulting file will be
620  transferred.
621  @param streams a list used to store opened streams
622  @param dc the DataCatalog being obtained
623  @param dsurl the URL of the DataCatalog
624  @param urlmore additional parts of the URL such as the
625  reference or HTTP Get
626  @param dest The local disk destination
627  @param logger the logging.Logger for log messages
628  @param timeout the connection timeout in seconds
629  @param realtime True for FORECAST mode, False for HISTORY mode.
630  @returns True if successful, False if not"""
631  if logger is None: logger=self._logger
632  parsed=urlparse.urlparse(dsurl)
633  joined=urlparse.urljoin(dsurl,urlmore,allow_fragments=True)
634  parsed=urlparse.urlparse(joined)
635  if logger is not None:
636  logger.info('%s + %s = %s',repr(dsurl),repr(urlmore),repr(joined))
637  scheme=parsed.scheme
638  path=parsed.path
639  netpart=parsed.netloc
640  n="%s://%s"%(scheme,netpart)
641  if scheme== 'file':
642  return self._impl_fetch_file(
643  parsed,joined,scheme,path,netpart,streams,dc,dsurl,urlmore,dest,
644  logger,timeout,realtime)
645  elif scheme=='ftp':
646  with self.locks[n]:
647  return self._impl_fetch_ftp(
648  parsed,joined,scheme,path,netpart,streams,dc,dsurl,urlmore,dest,
649  logger,timeout,realtime)
650  elif scheme=='sftp':
651  return self._impl_fetch_sftp(
652  parsed,joined,scheme,path,netpart,streams,dc,dsurl,urlmore,dest,
653  logger,timeout,realtime)
654  else:
655  raise UnsupportedTransfer(
656  'Cannot transfer this url: unsupported method (not htar, '
657  'ftp, file or sftp): '+joined)
658  return True
659  def _impl_fetch_file(self,parsed,joined,scheme,path,netpart,streams,dc,dsurl,
660  urlmore,dest,logger,timeout,realtime):
661  """!Fetches a file from local disk by making a symbolic link.
662  @param parsed The parsed URL from urlparse.urlparse
663  @param joined The joined URL from urlparse.urljoin
664  @param scheme The data transfer scheme (ftp, sftp, etc.)
665  @param path The URL path
666  @param netpart the netpart portion of the URL.
667  @param streams the array of transfer streams
668  @param dc the DataCatalog for the remote data
669  @param dsurl the dataset URL
670  @param urlmore section and other parts of the URL
671  @param dest the local disk destination
672  @param logger the logging.Logger for messages, or None
673  @param timeout connection timeout in seconds, ignored
674  @param realtime True for FORECAST mode, False if not. In
675  FORECAST mode, the symbolic link is made even if the file
676  does not exist, so long as the DataCatalog is marked as
677  realtime (DataCatalog.rt_updated() returns True)
678  @returns True on success, False if the file was not linked"""
679  if logger is not None:
680  logger.info('%s: from local file %s'%(dest,joined))
681  if ( realtime and dc.rt_updated() ) or os.path.exists(path):
682  makedirs(os.path.dirname(dest),logger=logger)
683  make_symlink(path,dest,force=True,logger=logger)
684  else:
685  return False
686  #produtil.fileop.deliver_file(path,dest,keep=True,logger=logger)
687  return True
688  def _impl_fetch_sftp(self,parsed,joined,scheme,path,netpart,streams,dc,dsurl,
689  urlmore,dest,logger,timeout,realtime):
690  """!Fetches a file via rsync over ssh.
691  @param parsed The parsed URL from urlparse.urlparse
692  @param joined The joined URL from urlparse.urljoin
693  @param scheme The data transfer scheme (ftp, sftp, etc.)
694  @param path The URL path
695  @param netpart the netpart portion of the URL.
696  @param streams the array of transfer streams
697  @param dc the DataCatalog for the remote data
698  @param dsurl the dataset URL
699  @param urlmore section and other parts of the URL
700  @param dest the local disk destination
701  @param logger the logging.Logger for messages, or None
702  @param timeout connection timeout in seconds
703  @param realtime True for FORECAST mode, False if not. Ignored.
704  @returns True on success, False if the file was not copied"""
705  tempname=None
706  try:
707  dirpath=os.path.dirname(path)
708  ok=self._sftp_dir_ok[netpart][dirpath]
709  if ok is None:
710  logger.info('%s:%s: check access.'%(netpart,dirpath))
711  ok=self.rsync_check_access(
712  netpart,logger=logger,dirpath=dirpath)
713  self._sftp_dir_ok[netpart][dirpath]=ok
714  if ok is False:
715  logger.info('%s:%s: skip: directory inaccessibble.'%(
716  netpart,path))
717  return False
718  makedirs(os.path.dirname(dest),logger=logger)
719  with tempopen(dest,'wb') as f:
720  tempname=f.name
721  cmd=self._rsync_ssh_exe(netpart,path,tempname)
722  checkrun(cmd,logger=logger)
723  os.rename(tempname,dest)
724  tempname=None
726  if logger is not None:
727  logger.warning("%s: non-zero exit status %s"%(
728  joined,repr(e.returncode)))
729  return False
730  finally:
731  if tempname is not None:
732  if logger is not None:
733  logger.warning('In finally block, deleting temp file %s.'%(tempname,))
734  os.remove(tempname)
735  return True
736  def _impl_fetch_ftp(self,parsed,joined,scheme,path,netpart,streams,dc,dsurl,
737  urlmore,dest,logger,timeout,realtime):
738  """!Fetches a file over FTP.
739  @param parsed The parsed URL from urlparse.urlparse
740  @param joined The joined URL from urlparse.urljoin
741  @param scheme The data transfer scheme (ftp, sftp, etc.)
742  @param path The URL path
743  @param netpart the netpart portion of the URL.
744  @param streams the array of transfer streams
745  @param dc the DataCatalog for the remote data
746  @param dsurl the dataset URL
747  @param urlmore section and other parts of the URL
748  @param dest the local disk destination
749  @param logger the logging.Logger for messages, or None
750  @param timeout connection timeout in seconds
751  @param realtime True for FORECAST mode, False if not. Ignored.
752  @returns True on success, False if the file was not copied"""
753  n="%s://%s"%(scheme,netpart)
754  if n not in streams:
755  streams[n]=self.open_ftp(n,logger=logger,timeout=timeout)
756  stream=streams[n]
757  tempname=None
758  try:
759  makedirs(os.path.dirname(dest),logger=logger)
760  with tempopen(dest,'wb') as f:
761  tempname=f.name
762  if logger is not None:
763  logger.info('%s: pull %s => %s'
764  %(n,parsed.path,tempname))
765  stream.retrbinary("RETR "+parsed.path,f.write)
766  remote_size=stream.size(parsed.path)
767  if remote_size is not None:
768  local_size=os.path.getsize(tempname)
769  if local_size!=remote_size:
770  if logger is not None:
771  logger.warning(
772  '%s: wrong size: %d local vs %d remote'
773  %(tempname,local_size,remote_size))
774  raise PartialTransfer(
775  'Could not transfer full file: only %d of %d '
776  'bytes transferred.'%(local_size,remote_size))
777  if logger is not None:
778  logger.info('%s: move from %s'%(dest,tempname))
779  os.rename(tempname,dest)
780  tempname=None
781  finally:
782  if tempname is not None:
783  logger.warning('In finally block, removing temp file %s'%(
784  tempname))
785  os.remove(tempname)
786  return True
787  def list_for(self,realtime=True):
788  """!Returns the list of DataCatalog objects for FORECAST or
789  HISTORY mode.
790  @param realtime True for FORECAST mode, False for HISTORY
791  @returns self.forecast or self.history
792  @post _sort() has been called, sorting self.forecast and
793  self.history in order of priority"""
794  if realtime:
795  if not self._f_sorted: self._sort()
796  return self.forecast
797  else:
798  if not self._h_sorted: self._sort()
799  return self.history
800 
801  def _impl_get_archive(self,archpath,parts,done,prio, loc, parsed, dc,
802  data,target_dc,realtime,logger,skip_existing):
803  """!Fetches an archive from HPSS
804  @param archpath path to the archive on HPSS
805  @param parts list of required archive elements as integer index
806  within the done argument
807  @param[out] done list of bool, set to True if the part was obtained
808  @param prio the priority of this input source
809  @param loc,parsed,dc,data,target_dt,realtime,skip_existing Ignored.
810  @param logger the logging.Logger for log messages"""
811  with produtil.cd.TempDir(prefix="pull.",cd=False,
812  keep_on_error=False) as td:
813  assert(isinstance(td,produtil.cd.TempDir))
814  assert(self.hsi is not None)
815  if self.hsi is not None:
816  i=self.hsi['get','-',':',archpath+'.idx']>"/dev/null"
817  err=run(i,logger=logger)
818  if err!=0:
819  logger.warning("%s.idx: exit status %d dumping index "
820  "file. Htar will probably fail."
821  %(archpath,int(err)))
822  r=self.htar['-xpf',archpath]\
823  [ [p for p in parts.iterkeys()] ]\
824  .cd(td.dirname)
825  logger.info('%s: list contents'%(td.dirname,))
826  for line in str(produtil.listing.Listing(path=td.dirname)):
827  logger.info(line)
828  stat=run(r,logger=logger)
829  if stat!=0:
830  logger.info('non-zero exit status %d from htar; will retry '
831  'in five seconds.'%stat)
832  for x in xrange(50):
833  time.sleep(0.1)
834  stat=run(r,logger=logger)
835  if stat!=0:
836  logger.info('non-zero exit status %d from htar; will keep '
837  'going anyway'%stat)
838  if logger is not None:
839  logger.info("%s: pull %d files"
840  %(archpath,len(parts)))
841  nope=set() # Files missing from archive
842  yup=set() # Files found in archive
843  for (filepart,tgti) in parts.iteritems():
844  tgt=tgti[0]
845  src=os.path.join(td.dirname,filepart)
846  logger.debug('%s: check for this at %s'%(tgt,src))
847  if os.path.exists(src):
848  makedirs(os.path.dirname(tgt),logger=logger)
849  deliver_file(src,tgt,keep=False,logger=logger)
850  for i in tgti[1:]:
851  logger.debug('%s: add %d'%(tgt,i))
852  done.add(i)
853  yup.add
854  relfile=os.path.relpath(src,td.dirname)
855  relfile=re.sub('^(../)+','',relfile)
856  yup.add(relfile)
857  else:
858  relfile=os.path.relpath(src,td.dirname)
859  relfile=re.sub('^(../)+','',relfile)
860  nope.add(relfile)
861  logger.debug('%s: does not exist'%(src,))
862  if nope:
863  missing=sorted(list(nope))
864  logger.warning('%s: does not have: %s'%(
865  archpath,', '.join(missing)))
866  if yup:
867  found=sorted(list(yup))
868  logger.warning('%s: has files: %s'%(
869  archpath,', '.join(found)))
870  if yup and not nope:
871  logger.info('%s: gleefully reporting all desired '
872  'files found.'%(archpath,))
873 
874 
875  def _impl_get_file(self,i,done,src,tgt,prio, loc, parsed, dc,streams,
876  archives,data,target_dc,realtime,logger,skip_existing):
877  """!Obtain one or more files.
878  @param i The index in done of the file being fetched
879  @param done an array of logical flags telling which files are transferred
880  @param src the source location
881  @param tgt the target location
882  @param prio the numerical priority
883  @param loc the on-disk destination
884  @param parsed the parsed URL as output by urlparse.urlparse
885  @param dc the DataCatalog
886  @param streams the array of transfer streams
887  @param archives a double-nested dict of lists, mapping from
888  archive name to file part to index within done of the file
889  in question
890  @param target_dc the DataCatalog of the target locations
891  @param realtime True for FORECAST mode, False for HISTORY mode
892  @param logger the logging.Logger for log messages
893  @param skip_existing if True, do not re-download files that
894  already exist on disk (in the target_dc)"""
895  archsep=src.find('#')
896  if archsep>=0:
897  # This is in an archive, so we will have to stage
898  # the archive first, and get the file in the
899  # second pass.
900  arch=src[0:archsep]
901  filepart=src[archsep+1:]
902  if arch in archives and filepart in archives[arch]:
903  archives[arch][filepart].append(i)
904  else:
905  archives[arch][filepart]=[tgt,i]
906  else:
907  if src[0:5]=='htar:':
908  logger.warning("%s: no # in path - skipping this"
909  %(src,))
910  return
911  try:
912  if self.fetch_file(
913  streams,dc,loc,src,tgt,
914  logger=logger,realtime=realtime):
915  done.add(i)
916  except (EnvironmentError,ExitStatusException) as e:
917  if logger is not None:
918  logger.warning(
919  'fetching %s=>%s: %s'%(str(src),str(tgt),
920  str(e)),exc_info=True)
921 
922  def priotable(self,dclist):
923  """!Generates a string containing a human-readable, prioritized
924  list of data sources.
925  @param dclist The data source list from list_for()
926  @returns A multi-line string containing the table.
927 
928  Example:
929  Prioritized list of data sources:
930  PRIO- LOCATION = SOURCE @ DATES
931  100 - file:/// = DataCatalog(conf,'wcoss_fcst_PROD2014',2015080518) @ '1970010100-2038011818'
932  098 - file:/// = DataCatalog(conf,'wcoss_prepbufrnr_PROD2014',2015080518) @ '1970010100-2038011818'
933  097 - file:// = DataCatalog(conf,'zhan_gyre',2015080518) @ '2011060718-2011111200,2013051800-2013091018'"""
934  s=StringIO.StringIO()
935  s.write('Prioritized list of data sources:\nPRIO- LOCATION = SOURCE @ DATES\n')
936  for ( prio, loc, parsed, dc, dates ) in dclist:
937  s.write('%03d - %10s = %s @ %s\n'%(
938  int(prio),str(loc),repr(dc),repr(dates)))
939  sv=s.getvalue()
940  s.close()
941  return sv
942 
943  def get(self,data,target_dc,realtime=False,logger=None,
944  skip_existing=True):
945  """!Transfers the specified set of data to the specified
946  target. The "target_dc" is a DataCatalog that specifies the
947  destination filenames. The "realtime" argument is True for
948  FORECAST (real-time) mode runs, and False for HISTORY
949  (retrospective) mode runs. The "data" argument should be an
950  iterable (list, tuple, etc.) where each element is a dict-like
951  object that describes one file to obtain. Each dict contains:
952 
953  dataset - string name of the dataset (gfs, gdas1, gefs,
954  enkf, etc.)
955  item - string name of the object (ie.: sf, sfcanl, bufr)
956  atime - Optional: a datetime.datetime specifying the
957  analysis time. Default is the atime from the
958  InputSource's constructor.
959  ftime - Optional: a datetime.datetime specifying the
960  forecast time.
961  ...others... - any other keyword arguments will be sent to
962  the .location functions in any of this InputSource's
963  DataCatalog objects."""
964  if logger is None: logger=self._logger
965  dclist=self.list_for(realtime)
966  done=set()
967  logger.info(self.priotable(dclist))
968  for ( prio, loc, parsed, dc, dates ) in dclist:
969  assert(loc is not None)
970  assert(prio is not None)
971  assert(parsed is not None)
972  assert(dc is not None)
973  assert(dates is not None)
974  scheme=parsed.scheme
975  netpart=parsed.netloc
976  if scheme=='sftp':
977  if not self.rsync_check_access(netpart,logger):
978  logger.error('%s: cannot access; will skip'%(netpart,))
979  continue
980  elif scheme not in ['ftp','htar','file']:
981  logger.error('%s: invalid transfer mode %s; will skip'
982  %(loc,scheme,))
983  continue
984  streams=dict()
985  archives=collections.defaultdict(dict)
986  workpool=None
987  try:
988  with produtil.workpool.WorkPool(3,logger) as workpool:
989  i=0
990  seen=set()
991  for d in data:
992  i+=1
993  if i in done: continue # skip files we already
994  # transferred
995  assert('dates' not in d)
996  tgt=target_dc.locate(**d)
997  if tgt is None:
998  continue
999  if tgt in seen:
1000  if logger is not None:
1001  logger.info('%s: already processing this'%(tgt,))
1002  continue
1003  if os.path.exists(tgt) and skip_existing:
1004  if logger is not None:
1005  logger.info('%s: already exists'%(tgt,))
1006  done.add(i)
1007  continue
1008  if logger is not None:
1009  logger.debug("%s => %s"%(repr(d),repr(tgt)))
1010  src="(unknown)"
1011  if logger is not None:
1012  logger.debug('search for %s in %s'%(repr(d),repr(dc)))
1013  try:
1014  src=dc.locate(dates=dates,**d)
1015  except KeyError as k:
1016  logger.debug("%s: key error %s"%(src,str(k)))
1017  continue
1018  if src is None: continue
1019  if logger is not None:
1020  logger.info("SRC %s => %s"%(strsrc(d),repr(src)))
1021  seen.add(tgt)
1022  workpool.add_work(self._impl_get_file,args=[
1023  i,done,src,tgt,prio, loc, parsed, dc,streams,
1024  archives,data,target_dc,realtime,logger,
1025  skip_existing])
1026  workpool.barrier()
1027  for (archpath,parts) in archives.iteritems():
1028  if len(parts)<=0:
1029  if logger is not None:
1030  logger.info("%s: nothing to pull; skip"
1031  %(archpath,))
1032  continue
1033  workpool.add_work(self._impl_get_archive,args=[
1034  archpath,parts,done,prio, loc, parsed, dc,
1035  data,target_dc,realtime,logger,skip_existing])
1036  workpool.barrier()
1037  finally:
1038  if logger is not None:
1039  logger.warning('In finally block, closing streams.')
1040  for (key,stream) in streams.iteritems():
1041  try:
1042  stream.close()
1043  except Exception as e:
1044  if logger is not None:
1045  logger.warning(
1046  'Exception while closing stream %s: %s'
1047  %(key,str(e)),exc_info=True)
1048  del workpool
1049  i=0
1050  bad=False
1051  for d in data:
1052  i+=1
1053  if i in done:
1054  continue
1055  tgt=target_dc.locate(**d)
1056  if os.path.exists(tgt):
1057  continue
1058  if d.get('optional',False):
1059  if logger is not None:
1060  logger.info('missing optional data: %s'%(repr(d),))
1061  else:
1062  if logger is not None:
1063  logger.warning('MISSING INPUT: %s'%(repr(d),))
1064  bad=True
1065  return not bad
1066 
1067  def get_one(self,dataset,item,dest,logger=None,timeout=20,realtime=True,
1068  **kwargs):
1069  """!This is a simple wrapper around fetch_file that gets only
1070  one file. It will fail if the file requires pulling an
1071  archive.
1072  @param dataset the dataset to transfer
1073  @param item the desired item in the dataset
1074  @param dest the on-disk destination filename
1075  @param logger a logging.Logger for log messages
1076  @param timeout the connection timeout in seconds
1077  @param realtime True for FORECAST mode, False for HISTORY mode
1078  @param kwargs extra keyword arguments are passed to DataCatalog.locate()"""
1079  if logger is None: logger=self._logger
1080  streams=dict()
1081  try:
1082  dclist=list_for(realtime)
1083  for ( prio, loc, parsed, dc ) in dclist:
1084  src=dc.locate(dataset=dataset,item=item,**kwargs)
1085  if src is None: continue
1086  archsep=src.find('#')
1087  if archsep>=0:
1088  raise NotImplementedError(
1089  'Source is in an archive. De-archiving is not '
1090  'supported by "get_one." Use "get" instead.')
1091  elif self.fetch_file(streams,dc,loc,src,dest,logger=logger):
1092  break
1093  finally:
1094  if logger is not None:
1095  logger.warning('In finally block, closing streams.')
1096  for (key,stream) in streams.iteritems():
1097  try:
1098  stream.close()
1099  except Exception as e:
1100  if logger is not None:
1101  logger.warning(
1102  'Exception while closing stream %s: %s'
1103  %(key,str(e)),exc_info=True)
Change directory, handle temporary directories.
Definition: cd.py:1
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
Imitates the shell "ls -l" program.
Definition: listing.py:9
def _impl_fetch_sftp(self, parsed, joined, scheme, path, netpart, streams, dc, dsurl, urlmore, dest, logger, timeout, realtime)
Fetches a file via rsync over ssh.
Definition: input.py:689
htar
A produtil.prog.ImmutableRunner that runs htar.
Definition: input.py:409
def locate(self, dataset, item, atime=None, ftime=None, logger=None, dates=None, kwargs)
Find the location of a requested piece of data.
Definition: input.py:259
def get
Transfers the specified set of data to the specified target.
Definition: input.py:944
Raised when an input source is missing the location, or both histprio and fcstprio.
Definition: exceptions.py:403
def priotable(self, dclist)
Generates a string containing a human-readable, prioritized list of data sources. ...
Definition: input.py:922
def tempopen(f, m)
Convenience function that opens a temporary file using tempfile.NamedTemporaryFile.
Definition: input.py:64
section
The section used for dataset and item locations in conf.
Definition: input.py:180
def __init__(self, conf, section, anltime)
DataCatalog constructor.
Definition: input.py:171
section
The section in conf that contains the data catalog list and relevant info.
Definition: input.py:394
def _impl_get_file(self, i, done, src, tgt, prio, loc, parsed, dc, streams, archives, data, target_dc, realtime, logger, skip_existing)
Obtain one or more files.
Definition: input.py:876
def rsync_check_access
Checks to see if rsync can even access a remote server.
Definition: input.py:594
Raised when a file transfer, done by an InputSource, was incomplete.
Definition: exceptions.py:406
A shell-like syntax for running serial, MPI and OpenMP programs.
Definition: run.py:1
def open_ftp
Opens an FTP connection.
Definition: input.py:556
Contains the WorkPool class, which maintains pools of threads that perform small tasks.
Definition: workpool.py:1
def _sort(self)
Sorts the list of history and forecast DataCatalogs by decreasing priority.
Definition: input.py:506
def _impl_fetch_file(self, parsed, joined, scheme, path, netpart, streams, dc, dsurl, urlmore, dest, logger, timeout, realtime)
Fetches a file from local disk by making a symbolic link.
Definition: input.py:660
def makedirs
Make a directory tree, working around filesystem bugs.
Definition: fileop.py:224
Time manipulation and other numerical routines.
Definition: numerics.py:1
hsi
A produtil.prog.ImmutableRunner that runs hsi.
Definition: input.py:410
def fetch_file
Internal implementation function that fetches one file.
Definition: input.py:611
This class is intended to be used with the Python "with TempDir() as t" syntax.
Definition: cd.py:38
Provides information about the cluster on which this job is running.
Definition: cluster.py:1
Raised when a configuration file requests a DataCatalog class that does not exist.
Definition: exceptions.py:400
locks
Lock objects to restrict access to FTP servers to one thread at a time.
Definition: input.py:406
history
List of history mode DataCatalog objects.
Definition: input.py:404
A pool of threads that perform some list of tasks.
Definition: workpool.py:84
def _impl_fetch_ftp(self, parsed, joined, scheme, path, netpart, streams, dc, dsurl, urlmore, dest, logger, timeout, realtime)
Fetches a file over FTP.
Definition: input.py:737
conf
The configuration object, an hwrf.config.HWRFConfig or subclass.
Definition: input.py:176
def list_for
Returns the list of DataCatalog objects for FORECAST or HISTORY mode.
Definition: input.py:787
Fetch data from multiple sources.
Definition: input.py:301
def __repr__(self)
A string representation of this DataCatalog.
Definition: input.py:192
conf
The hwrf.config.HWRFConfig object used for configuration info.
Definition: input.py:393
Raised to indicate that a program generated an invalid return code.
Definition: run.py:179
Contains the Listing class, which emulates "ls -l".
Definition: listing.py:1
valid
Data source validitiy information.
Definition: input.py:411
def _rsync_ssh_exe
Creates a produtil.prog.Runner for running rsync over ssh.
Definition: input.py:485
def strsrc(d)
Makes a string version of a dataset+item dict as produced by hwrf_expt.inputiter() or hwrf...
Definition: input.py:74
def get_one(self, dataset, item, dest, logger=None, timeout=20, realtime=True, kwargs)
This is a simple wrapper around fetch_file that gets only one file.
Definition: input.py:1068
def in_date_range(t, trange)
Is this time in the given time range?
Definition: input.py:29
anltime
The default analysis time.
Definition: input.py:395
Provides the location of a file in an archive, on disk or on a remote server via sftp or ftp...
Definition: input.py:109
def __init__
InputSource constructor.
Definition: input.py:384
Exceptions raised by the hwrf package.
Definition: exceptions.py:1
def name()
Synonym for here.name.
Definition: cluster.py:109
def rt_updated(self)
Is this dataset updated in real-time?
Definition: input.py:199
forecast
List of forecast mode DataCatalog objects.
Definition: input.py:402
def _impl_get_archive(self, archpath, parts, done, prio, loc, parsed, dc, data, target_dc, realtime, logger, skip_existing)
Fetches an archive from HPSS.
Definition: input.py:802
anltime
The default analysis time for parse() and locate() if none is specified.
Definition: input.py:181
def parse(self, string, atime=None, ftime=None, logger=None, dates=None, kwargs)
Internal function that performs string interpolation.
Definition: input.py:210
def add
Adds a DataCatalog to this InputSource.
Definition: input.py:511