HWRF
trunk@4391
|
Fetch data from multiple sources. More...
Fetch data from multiple sources.
This class knows how to fetch data from remote clusters, or the local machine. The data locations are specified by a several DataCatalog sections, each of which is given a priority, a valid set of dates and a file transfer mechanism. Data catalogs are tried in priority order. Files are obtained in multiple threads at once, and several file transfer mechanisms are understood:
However, only one DataCatalog is examined at a time. All threads work on that one DataCatalog until all data that can be obtained from it is done. Then the threads exit, and new ones are spawned to examine the next DataCatalog.
For example, suppose you are on the Jet supercomputer running a HISTORY (retrospective) simulation. You set up this configuration section in your hwrf.conf config file:
and this is the code:
In this example, the InputSource will look for three GFS surface analysis files. It will search two possible locations for them: the on-disk Jet "PROD2014" history location and the NCO production tape files. The disk location will be searched first because its history priority is 90, while the tape area has a priority of 59.
Three files will show up eventually:
Each file will come from either here:
or here:
Public Member Functions | |
def | __init__ |
InputSource constructor. More... | |
def | add |
Adds a DataCatalog to this InputSource. More... | |
def | open_ftp |
Opens an FTP connection. More... | |
def | rsync_check_access |
Checks to see if rsync can even access a remote server. More... | |
def | fetch_file |
Internal implementation function that fetches one file. More... | |
def | list_for |
Returns the list of DataCatalog objects for FORECAST or HISTORY mode. More... | |
def | priotable (self, dclist) |
Generates a string containing a human-readable, prioritized list of data sources. More... | |
def | get |
Transfers the specified set of data to the specified target. More... | |
def | get_one (self, dataset, item, dest, logger=None, timeout=20, realtime=True, kwargs) |
This is a simple wrapper around fetch_file that gets only one file. More... | |
Public Attributes | |
conf | |
The hwrf.config.HWRFConfig object used for configuration info. | |
section | |
The section in conf that contains the data catalog list and relevant info. | |
anltime | |
The default analysis time. More... | |
forecast | |
List of forecast mode DataCatalog objects. More... | |
history | |
List of history mode DataCatalog objects. More... | |
locks | |
Lock objects to restrict access to FTP servers to one thread at a time. More... | |
htar | |
A produtil.prog.ImmutableRunner that runs htar. More... | |
hsi | |
A produtil.prog.ImmutableRunner that runs hsi. More... | |
valid | |
Data source validitiy information. More... | |
def hwrf.input.InputSource.__init__ | ( | self, | |
conf, | |||
section, | |||
anltime, | |||
htar = None , |
|||
logger = None , |
|||
hsi = None |
|||
) |
InputSource constructor.
conf | the hwrf.config.HWRFConfig to use for configuration info |
section | the section that specifies the list of data catalogs |
anltime | the default analysis time |
htar | the produtil.prog.Runner that runs htar |
logger | a logging.Logger for log messages |
hsi | the produtil.prog.Runner that runs hsi |
def hwrf.input.InputSource.add | ( | self, | |
dc, | |||
location, | |||
fcstprio = None , |
|||
histprio = None , |
|||
dates = None |
|||
) |
Adds a DataCatalog to this InputSource.
Called automatically from the constructor to add a DataCatalog to this InputSource. The list of add() calls is generated from the config section specified in the constructor. You should never need to call this function unless you want to explicitly add more DataCatalog objects that are not listed in the config files.
The location parameter is a URL from file, sftp, ftp or htar. Examples:
dc | the DataCatelog object |
location | the URL of the data source, including the username if needed. |
fcstprio | the priority for using this source in FORECAST (real-time) mode. If missing or None, the source will not be used in FORECAST mode. |
histprio | the priority for using this source in HISTORY (retrospective) mode. If missing or None,the source will not be used in HISTORY mode. |
dates | Dates for which this source is valid. This is passed to the trange argument of in_date_range(t,trange) |
Definition at line 511 of file input.py.
Referenced by produtil.fileop.FileWaiter.add().
def hwrf.input.InputSource.fetch_file | ( | self, | |
streams, | |||
dc, | |||
dsurl, | |||
urlmore, | |||
dest, | |||
logger = None , |
|||
timeout = 20 , |
|||
realtime = True |
|||
) |
Internal implementation function that fetches one file.
You should not call this directly; it is meant to be called by "get" and re-implemented in subclasses. This grabs one file, potentially from a remote location. The URL for the base directory of some dataset is in dsurl, while the specific file is in urlmore. The urlmore will be appended to the file part of dsurl via urljoin, and the resulting file will be transferred.
streams | a list used to store opened streams |
dc | the DataCatalog being obtained |
dsurl | the URL of the DataCatalog |
urlmore | additional parts of the URL such as the reference or HTTP Get |
dest | The local disk destination |
logger | the logging.Logger for log messages |
timeout | the connection timeout in seconds |
realtime | True for FORECAST mode, False for HISTORY mode. |
Definition at line 611 of file input.py.
Referenced by hwrf.input.InputSource.get_one(), hwrf.input.InputSource.list_for(), and hwrf.input.InputSource.rsync_check_access().
def hwrf.input.InputSource.get | ( | self, | |
data, | |||
target_dc, | |||
realtime = False , |
|||
logger = None , |
|||
skip_existing = True |
|||
) |
Transfers the specified set of data to the specified target.
The "target_dc" is a DataCatalog that specifies the destination filenames. The "realtime" argument is True for FORECAST (real-time) mode runs, and False for HISTORY (retrospective) mode runs. The "data" argument should be an iterable (list, tuple, etc.) where each element is a dict-like object that describes one file to obtain. Each dict contains:
dataset - string name of the dataset (gfs, gdas1, gefs, enkf, etc.) item - string name of the object (ie.: sf, sfcanl, bufr) atime - Optional: a datetime.datetime specifying the analysis time. Default is the atime from the InputSource's constructor. ftime - Optional: a datetime.datetime specifying the forecast time. ...others... - any other keyword arguments will be sent to the .location functions in any of this InputSource's DataCatalog objects.
Definition at line 944 of file input.py.
Referenced by hwrf.wrfbase.WRFDomains.__contains__(), hwrf.wrfbase.WRFDomains.__getitem__(), hwrf.wrfbase.WRFDomains.add(), hwrf.wrf.WRFSimulation.analysis_name(), produtil.datastore.UpstreamFile.check(), hwrf.regrib.GRIB1Product.getgrib1grbindex(), hwrf.regrib.GRIB1Product.getgrib1grid(), hwrf.regrib.GRIB1Product.getgrib1index(), hwrf.regrib.GRIB2Product.getgrib2grid(), hwrf.regrib.GRIB2Product.getgrib2index(), and hwrf.input.InputSource.priotable().
def hwrf.input.InputSource.get_one | ( | self, | |
dataset, | |||
item, | |||
dest, | |||
logger = None , |
|||
timeout = 20 , |
|||
realtime = True , |
|||
kwargs | |||
) |
This is a simple wrapper around fetch_file that gets only one file.
It will fail if the file requires pulling an archive.
dataset | the dataset to transfer |
item | the desired item in the dataset |
dest | the on-disk destination filename |
logger | a logging.Logger for log messages |
timeout | the connection timeout in seconds |
realtime | True for FORECAST mode, False for HISTORY mode |
kwargs | extra keyword arguments are passed to DataCatalog.locate() |
Definition at line 1068 of file input.py.
Referenced by hwrf.input.InputSource.get().
def hwrf.input.InputSource.list_for | ( | self, | |
realtime = True |
|||
) |
Returns the list of DataCatalog objects for FORECAST or HISTORY mode.
realtime | True for FORECAST mode, False for HISTORY |
Definition at line 787 of file input.py.
Referenced by hwrf.input.InputSource.get(), and hwrf.input.InputSource.get_one().
def hwrf.input.InputSource.open_ftp | ( | self, | |
netpart, | |||
logger = None , |
|||
timeout = 20 |
|||
) |
Opens an FTP connection.
Opens the specified ftp://user@host/... request subject to the specified timeout, logging to the specified logger (if present and non-Null).
netpart | The netpart portion of the URL |
logger | the logging.Logger for log messages |
timeout | the connection timeout in seconds |
Definition at line 556 of file input.py.
Referenced by hwrf.input.InputSource.fetch_file().
def hwrf.input.InputSource.priotable | ( | self, | |
dclist | |||
) |
Generates a string containing a human-readable, prioritized list of data sources.
dclist | The data source list from list_for() |
Example: Prioritized list of data sources: PRIO- LOCATION = SOURCE @ DATES 100 - file:/// = DataCatalog(conf,'wcoss_fcst_PROD2014',2015080518) @ '1970010100-2038011818' 098 - file:/// = DataCatalog(conf,'wcoss_prepbufrnr_PROD2014',2015080518) @ '1970010100-2038011818' 097 - file:// = DataCatalog(conf,'zhan_gyre',2015080518) @ '2011060718-2011111200,2013051800-2013091018'
Definition at line 922 of file input.py.
Referenced by hwrf.input.InputSource.get().
def hwrf.input.InputSource.rsync_check_access | ( | self, | |
netpart, | |||
logger = None , |
|||
timeout = 20 , |
|||
dirpath = '/' |
|||
) |
Checks to see if rsync can even access a remote server.
netpart | the netpart portion of the URL |
logger | the logging.Logger for log messages |
timeout | the connection timeout in seconds |
Definition at line 594 of file input.py.
Referenced by hwrf.input.InputSource.fetch_file(), and hwrf.input.InputSource.get().
hwrf.input.InputSource.anltime |
The default analysis time.
Definition at line 395 of file input.py.
Referenced by hwrf.wrfbase.WRFOutput.__repr__(), hwrf.fcsttask.WRFAnl.wrfanl_at_time(), and hwrf.fcsttask.AnalysisCycle.wrfinput_at_time().
hwrf.input.InputSource.forecast |
List of forecast mode DataCatalog objects.
Definition at line 402 of file input.py.
Referenced by hwrf.input.InputSource.list_for().
hwrf.input.InputSource.history |
List of history mode DataCatalog objects.
Definition at line 404 of file input.py.
Referenced by hwrf.input.InputSource.list_for().
hwrf.input.InputSource.hsi |
A produtil.prog.ImmutableRunner that runs hsi.
Definition at line 410 of file input.py.
Referenced by hwrf.input.InputSource.list_for().
hwrf.input.InputSource.htar |
A produtil.prog.ImmutableRunner that runs htar.
Definition at line 409 of file input.py.
Referenced by hwrf.input.InputSource.list_for().
hwrf.input.InputSource.locks |
Lock objects to restrict access to FTP servers to one thread at a time.
Definition at line 406 of file input.py.
Referenced by hwrf.input.InputSource.fetch_file().
hwrf.input.InputSource.valid |