1 """!This module provides a set of utility functions to do filesystem
2 operations. It replaces or improves upon several os, stat, and sys
3 module functions by working around Python bugs, providing an API layer
4 that allows forward compatibility to future Python versions, and
5 adding logging capabilities."""
9 __all__=[
'FileOpError',
'FileOpErrors',
'CannotLinkMulti',
10 'UnexpectedAbsolutePath',
'InvalidExecutable',
11 'FindExeInvalidExeName',
'CannotFindExe',
'RelativePathError',
12 'DeliveryFailed',
'VerificationFailed',
'realcwd',
'chdir',
13 'makedirs',
'remove_file',
'rmall',
'lstat_stat',
'isnonempty',
14 'check_file',
'deliver_file',
'make_symlinks_in',
'find_exe',
15 'make_symlink',
'replace_symlink',
'unblock',
'fortcopy',
16 'norm_expand_path',
'norm_abs_path',
'check_last_lines',
17 'wait_for_files',
'FileWaiter',
'call_fcntrl',
'gribver',
20 import os,tempfile,filecmp,stat,shutil,errno,random,time,fcntl,math,logging
23 module_logger=logging.getLogger(
'produtil.fileop')
28 """!This is the superclass of several exceptions relating to
29 multi-file operations in produtil.fileop."""
31 """!FileOpError constructor
32 @param message the error message
33 @param filename the name of the problematic file
34 @param more a list of tuples, (from,to,message) where from is
35 the source file, to is the destination file and message is a
36 description of the problem with that pair."""
52 """!A string description of the problem."""
55 """!Iterates over a list of tuples, (from,to,message) where
56 from is the source file, to is the destination file and
57 message is a description of the problem with that pair."""
58 for fromfile,tofile,message
in self.
more:
59 yield fromfile,tofile,message
61 """!This exception is raised when an operation that processes
62 multiple files catches more than one exception."""
63 class CannotLinkMulti(FileOpError):
64 """!This exception is raised when the caller tries to create
65 multiple symlinks in a single target, but the target is not a
68 """!This exception indicates that the renamer function sent to
69 make_symlinks_in returned an absolute path."""
71 """!Thrown when a find_exe fails."""
73 """!Thrown when find_exe is given an executable name that contains
76 """!Thrown when find_exe cannot find an executable in the path or
79 """!Raised when a relative path is given, but an absolute path is
83 """!Raised when os.symlink makes a symlink to a target other than
84 the one that was requested. This is present to detect a bug in
85 Cray where os.symlink randomly makes a symlink to the wrong
89 """!This exception is raised when a file cannot be delivered."""
91 """!DeliveryFailed constructor.
92 @param message why the delivery failed
93 @param fromfile what was being delivered
94 @param tofile delivery destination"""
108 """!Human-readable description of this error."""
109 return '%s: cannot deliver (from %s): %s'%(
112 """!Pythonic representation of this error."""
113 return 'DeliveryFailed(%s,%s,%s)' % \
117 """!This exception is raised when a copy of a file has different
118 content than the original."""
119 def __init__(self,message,fromfile,tofile,verifyfile):
120 """!VerificationFailed constructor.
121 @param message why the delivery failed
122 @param fromfile what was being delivered
123 @param tofile delivery destination
124 @param verifyfile temporary file that failed verification"""
125 DeliveryFailed.__init__(self,message,fromfile,tofile)
131 """!Human-readable description of this error."""
132 return '%s: verification failed on temporary file %s (from %s): %s'%\
135 """!Pythonic representation of this error."""
136 return 'VerificationFailed(%s,%s,%s,%s)'%\
141 """!Returns the current working directory, expanding any symbolic
143 return os.path.realpath(os.getcwd())
146 """!Changes to the specified directory. Please use
147 produtil.cd.NamedDir instead.
149 This is generally not a good idea since you will not cd back if an
150 unhandled exception is raised. It is better to use the
151 produtil.cd module, which provides ways to enter a directory in a
152 "with" block and optionally delete it afterwards. Such
153 functionality could also be implemented via a try...finally block.
154 @param path the path to cd to
155 @param logger a logging.Logger for log messages"""
158 if logger
is not None: logger.info(path+
': cd here')
160 except EnvironmentError
as e:
161 logger.warning(path+
': cannot cd: '+str(e),exc_info=
True)
165 """!Open the file for append and set mtime and atime.
167 Opens the specified file in append mode, but writes nothing. Sets
168 the access and modification times.
170 @param filename the string filename
171 @param times A 2-tuple of numbers, of the form (atime, mtime).
172 These are UNIX epoch times (seconds since 1970 began in UTC)."""
173 with open(filename,
'a'):
174 os.utime(filename, times)
178 """!What is the NetCDF version of this file?
180 Returns one of three strings based on the NetCDF version of the
181 given file, or returns None if the file is not NetCDF:
182 * "CDF1" = NetCDF classic format
183 * "CDF2" = NetCDF 64-bit offset format
184 * "HDF5" = HDF5 file, and hence possibly a NetCDF4 file.
185 * None = Not NetCDF and not HDF5
186 @param filename the name of the file to test"""
187 with open(filename,
'rb')
as f:
194 elif four==
'CDF\x02':
196 elif eight==
'\x89\x48\x44\x46\x0d\x0a\x1a\x0a':
202 """!What is the GRIB version of this file?
204 Returns the GRIB file version: 1 or 2. If the file is not a GRIB
205 file, or if the answer is indeterminate, returns None. Only the
206 first GRIB record is tested.
207 @param filename the path to the file to test"""
208 if not isinstance(filename,basestring):
209 raise TypeError(
'The first argument to gribver should be '
210 'a filename. You provided a %s %s.'%
211 (type(filename).__name__,repr(filename)))
214 with open(filename,
'rb')
as f:
216 if eight==
'GRIB\x00\x00\x00\x02':
218 elif eight[0:4]==
'GRIB':
225 """!Make a directory tree, working around filesystem bugs.
227 This makedirs implementation works around a common bug: if two
228 processes try to recursively make a directory tree simultaneously,
229 makedirs can fail when two processes make the same path component
230 at the same time. This implementation automatically retries in
232 @param filename the directory path
233 @param numtries the number of times to retry
234 @param logger a logging.Logger for log messages"""
235 for n
in xrange(numtries):
237 if not os.path.isdir(filename):
238 if logger
is not None:
239 logger.info(filename+
': make directory and parents')
240 os.makedirs(filename)
241 except EnvironmentError
as e:
242 if os.path.isdir(filename):
244 elif os.path.exists(filename):
252 """!Deletes the specified file.
254 Does nothing if the filename is None, is the empty string or
255 already does not exist. Otherwise, the file is deleted.
256 @param filename The file to delete.
257 @param info Optional: indicates that warnings about a file already
258 not existing should be sent to the logger at INFO level
259 (info=True) instead of WARNING (info=False).
260 @param logger the logging.Logger for messages"""
261 if filename
is None or filename==
'':
264 if logger
is not None: logger.info(
'%s: remove file'%(filename,))
266 except EnvironmentError
as e:
267 if e.errno!=errno.ENOENT:
268 if logger
is not None:
269 logger.warning(
'%s: cannot remove: %s'%(filename,str(e)),
272 if logger
is not None:
273 log=logger.info
if info
else logger.warning
274 log(
'%s: cannot remove; does not exist: %s'%(filename,str(e)))
277 """!Deletes the specified list of files.
279 Deletes files listed in "args". Each one is passed to
280 remove_file. Exceptions that derive from EnvironmentError are
281 collected, and will be raised at the end, thus allowing removal of
282 later files to continue if earlier ones failed. If only one file
283 causes an exception, that exception will be raised, otherwise
284 FileOpErrors will be raised
285 @param args The files to delete.
286 @param kwargs Keyword arguments passed to remove_file()."""
287 logger=kwargs.get(
'logger',
None)
288 if logger
is not None:
289 logger.info(
'Removing %d files...'%(len(args),))
294 except EnvironmentError
as e:
295 ex.append( (arg,
None,e) )
299 msg=
'Multiple exceptions caught while deleting files in rmall.'
300 if logger
is not None: logger.warning(msg)
302 [ (a,b,str(c))
for a,b,c
in ex ] )
303 if logger
is not None:
304 logger.info(
'Done removing %d files...'%(len(args),))
308 """!Runs lstat and stat on a file as efficiently as possible.
310 Returns (lstat(filename),stat(filename)) where each is None if it
311 fails due to non-existence. Does this in as few filesystem
312 metadata operations as possible. Will raise an exception if the
313 stat fails for any reason other than non-existence of a file, or
314 if the file or linked file is non-existent and
316 @param filename The file to test.
317 @param raise_nonexist Should we raise an exception if the file does not exist?
318 @returns a tuple (L,S) where L is the lstat return value, and S is
319 the stat return value. Each will be None if the file or link
320 target do not exist."""
321 assert(filename
is not None)
322 (xlstat,xstat)=(
None,
None)
324 xlstat=os.lstat(filename)
325 if not stat.S_ISLNK(xlstat.st_mode):
326 return (xlstat,xlstat)
327 xstat=os.stat(filename)
328 except EnvironmentError
as e:
329 if raise_nonexist
or e.errno!=errno.ENOENT:
331 return (xlstat,xstat)
334 """!Returns True if the filename refers to an existent file that is
335 non-empty, and False otherwise.
336 @param filename The file to test."""
337 if filename
is None:
return None
339 if sfile
is None or sfile==
'':
return None
341 ret=s
is not None and s.st_size>0
342 if not ret
and sync_frequently:
344 '%s: is empty or does not exist. Will sync and recheck.'
348 ret=s
is not None and s.st_size>0
350 '%s: isnonempty=%s; s=%s s.st_size=%s'
351 %(filename,repr(ret),repr(s),repr(
None if s
is None else s.st_size)))
355 def deliver_file(infile,outfile,keep=True,verify=False,blocksize=1048576,
356 tempprefix=
None,permmask=002,removefailed=
True,
357 logger=
None,preserve_perms=
True,preserve_times=
True,
358 preserve_group=
None,copy_acl=
None,moveok=
True,
359 force=
True, copier=
None):
360 """!This moves or copies the file "infile" to "outfile" in a unit
361 operation; outfile will never be seen in an incomplete state.
363 If the caller specifies keep=False (default is True) and
364 moveok=True, and the source and destination are on the same
365 filesystem then the delivery is done with a simple move.
366 Otherwise a copy is done to a temporary file on the same
367 filesystem as the target. If verification is requested
368 (verify=True) then the temporary file is verified by filecmp.cmp,
369 before moving the temporary file to the final location.
371 When requested, and when possible, the permissions and ownership
372 are preserved. Both copy_acl and preserve_group have defaults set
373 by the produtil.cluster module. If the cluster uses access
374 control lists for data restriction classes, then copy_acl will be
375 set to True, otherwise it is false. If group quotas are enabled,
376 preserve_group is False, otherwise it is True.
378 @note The original file is never deleted, but it may be moved to
379 the target if keep=False. If a copy is done instead, the original
380 file is still present.
382 @param infile the origin file
383 @param outfile the destination file or its parent directory
384 @param keep If False, the original file is no longer needed. If False
385 and moveok=True, the file might be delivered by a "mv"
386 operation, avoiding any data duplication (no "cp").
387 @param verify If a "cp" is done, reopen the target and source and
388 verify they are the same. Note that providing a copier will
389 break the verification functionality if the copier changes the
390 contents of the destination file (such as a copier that compresses).
391 @param blocksize block size during copy operations
392 @param tempprefix Prefix for temporary files during copy operations.
393 Do not include directory paths in the tempprefix.
394 @param permmask Permission bits to remove Default: world write (002)
395 @param removefailed If True, delete temporary files if the delivery fails
396 @param logger the logging.Logger for log messages.
397 @param preserve_perms If True, copy the old file's permissions to
399 @param preserve_times If True, copy the old file's timestamps to
401 @param preserve_group If True, copy the old file's group ID to the
403 @param copy_acl If True, copy the access control lists from one file
405 @param moveok If True, delivery by "mv" is allowed. Must also set
407 @param force If False, delivery will be aborted (raise
408 TargetFileExists) if the target file already exists.
409 @param copier If present, this function or callable object is used to
410 copy data from the source file to the temporary file before moving
411 it to the target. The copier is called as
412 copier(infile,temp_file_name,temp_file_object)
413 Where the temp_file_name is the name of the destination file and
414 the temp_file_object is an object that can be used to write to
415 the file. The copier should NOT close the temp_file_object. """
416 if preserve_group
is None:
420 if copier
is not None:
425 assert(infile
is not None)
426 assert(outfile
is not None)
427 inbase=os.path.basename(infile)
428 (ilstat,istat)=
lstat_stat(infile,raise_nonexist=
True)
430 if stat.S_ISDIR(istat.st_mode):
431 raise DeliveryFailed(
'This subroutine cannot deliver directories.',
436 actual_outfile=outfile
439 (odlstat,odstat)=(
None,
None)
440 if ofstat
is not None:
441 if stat.S_ISDIR(ofstat.st_mode):
442 outdir=actual_outfile
443 actual_outfile=os.path.join(outfile,inbase)
444 (odlstat,odstat)=(oflstat,ofstat)
445 if logger
is not None:
446 logger.debug(
'%s: is a directory; file is %s'
450 outdir=os.path.dirname(outfile)
451 if len(outdir)<1: outdir=
'.'
452 if logger
is not None:
453 logger.debug(
'%s: exists, so parent %s must exist and be a '
454 'directory'%(outfile,outdir))
455 (odlstat,odstat)=
lstat_stat(outdir,raise_nonexist=
True)
458 'target does not exist.',infile,outfile)
459 if not stat.S_ISDIR(odstat.st_mode):
461 'target is not a directory.',infile,outfile)
463 if odstat
is not None and not force:
464 if logger
is not None:
465 logger.debug(
'%s: exists and overwrite (force) is disabled. '
466 'Aborting delivery.'%(actual_outfile,))
471 if ofstat
is not None:
472 if stat.S_ISLNK(ofstat.st_mode):
473 if logger
is not None:
474 logger.info(
'%s: destination is a link, will recopy as '
475 'a non-link.'%(actual_outfile,))
476 elif os.path.samestat(istat,ofstat):
477 if logger
is not None:
478 logger.info(
'%s: same as %s'%(actual_outfile,infile))
480 if logger
is not None:
481 logger.info(
'%s: exists, replacing with %s'%(
482 actual_outfile,infile))
483 elif logger
is not None:
484 logger.debug(
'%s: does not exist'%(actual_outfile))
490 samefs = (istat.st_dev == odstat.st_dev)
491 if samefs
and not keep
and moveok:
492 if stat.S_ISLNK(ilstat.st_mode):
493 if logger
is not None:
494 logger.info(
'%s: cannot deliver via "os.rename" since '
495 'source is a link.'%(infile,))
497 if logger
is not None:
498 logger.info(
'%s: move from %s'%(actual_outfile,infile))
500 os.rename(infile,actual_outfile)
502 except EnvironmentError
as e:
503 if logger
is not None:
504 logger.info(
'%s: could not deliver by os.rename: %s'
505 %(actual_outfile,str(e)))
515 if tempprefix
is None:
516 tempprefix=
"tmp."+inbase+
".part."
517 temp=tempfile.NamedTemporaryFile(prefix=tempprefix,
518 delete=
False,dir=outdir)
520 if logger
is not None:
521 logger.info(
'%s: copy to temporary %s'%(infile,tempname))
523 with open(infile,
'rb')
as indata:
524 shutil.copyfileobj(indata,temp,length=blocksize)
526 copier(infile,tempname,temp)
531 if logger
is not None:
532 logger.info(
'%s: verify copy %s'%(infile,tempname))
533 if not filecmp.cmp(infile,tempname):
535 infile,actual_outfile,tempname)
536 if logger
is not None:
537 logger.info(
'%s: copy group ID and permissions to %s'
542 os.chown(tempname,-1,istat.st_gid)
543 except(IOError,OSError)
as e:
546 if logger
is not None:
547 logger.warning(
'%s: cannot copy groupid to %s: %s'
548 % (infile,tempname,str(e)))
550 os.chmod(tempname,istat.st_mode&~permmask)
553 if logger
is not None:
554 logger.info(
'%s: move from %s'%(actual_outfile,tempname))
556 os.utime(tempname,(istat.st_atime,istat.st_mtime))
557 os.rename(tempname,actual_outfile)
559 except Exception
as e:
560 if logger
is not None:
561 logger.error(
'%s: delivery failed: %s'%(infile,str(e)))
567 if removefailed
and tempname
is not None:
569 except EnvironmentError
as e:
573 def find_exe(name,dirlist=None,raise_missing=True):
574 """!Searches the $PATH or a specified iterable of directory names
575 to find an executable file with the given name.
577 Returns the exectuable's location. If the executable cannot be
578 found, and raise_missing=True, raises CannotFindExe, otherwise
579 returns None. Raises FindExeInvalidExeName if "name" is not the
580 same as its os.path.basename.
582 @param name The name of the executable to find.
583 @param dirlist The list of directories to search, or None to search $PATH
584 @param raise_missing If True, the CannotFindExe exception is
585 raised for executables that cannot be found. If False, return
586 None in that situation."""
587 bn=os.path.basename(name)
590 'executable name is not the same as its basename in '
591 'find_exe (basename=%s)'%(bn,),name)
593 dirlist=os.environ[
'PATH'].split(
':')
594 for dirname
in dirlist:
595 if dirname==
'': dirname=
'.'
596 exename=os.path.join(dirname,name)
597 if os.path.isfile(exename)
and os.access(exename,os.X_OK):
599 if not raise_missing:
return None
605 """!Opens the specified file for reading and attempts to read data
606 to it. Logs the process. Will NOT raise any I/O or system
607 errors; they are ignored. This is a workaround for a bug in Cray:
608 symlinks to recently created files cannot be read by the compute
609 node unless the batch node reads from them first (or unless you
612 with open(filename,
'rt')
as f:
614 if logger
is not None:
615 logger.info(
'%s: read %s'%(filename,repr(buf)))
616 except EnvironmentError
as e:
619 def make_symlinks_in(sources,targetdir,force=False,renamer=None,logger=None,
621 """!Creates symbolic links from a set of source files to a target
622 directory. If "force" is True, then any existing files will first
625 The "renamer" can be a function that generates paths of the
626 symlinks, relative to targetdir, for each symlink in "sources".
627 If the return value from "renamer" is an absolute path, an
628 exception will be thrown. If the return value is None, then no
632 make_symlinks_in(['/path/to/a','/path/to/b'],'.',
633 renamer=lambda s: os.path.basename(s)+'.linkified')
635 will create a.linkified, linked to /path/to/a, and b.linkified,
636 linked to /path/to/b in directory "."
637 @param sources The list of files to link to.
638 @param targetdir The directory in which to place the links.
639 @param force Remove existing files if needed.
640 @param renamer Function to generate link names.
641 @param logger A logging.Logger for log messages.
642 @param copy If True, files are copied instead of linked."""
643 (tlstat,tstat)=
lstat_stat(targetdir,raise_nonexist=
True)
644 if not stat.S_ISDIR(tstat.st_mode):
647 for source
in sources:
650 if renamer
is not None:
651 target=renamer(source)
652 if target
is None:
continue
653 if os.path.isabs(target):
655 'renamed path is absolute',renamed)
656 target=os.path.join(targetdir,target)
658 target=os.path.join(targetdir,os.path.basename(source))
660 if os.path.exists(source):
661 if force
and os.path.exists(target):
664 elif logger
is not None:
666 '%s: skip. Am in copy mode, and source does not exist (%s)'%(
670 except EnvironmentError
as e:
671 errors.append( (source,str(target),str(e)) )
672 if logger
is not None:
673 logger.warning(str(e),exc_info=
True)
675 raise FileOpError(
'cannot link files',targetdir,errors)
678 """!Creates a symbolic link "target" that points to "source". If
679 the target already exists and is NOT a directory, then the file
680 will be replaced. The replacement is done in a unit operation so
681 that the target will always exist (unless the operation fails).
682 @param source The file to link to.
683 @param target The name of the link.
684 @param force If True, and target exists, delete it first.
685 @param logger a logging.Logger for log messages."""
686 if logger
is not None:
687 logger.info(
'link %s -> %s'%(target,source))
688 if os.path.isdir(target):
689 target=os.path.join(target,os.path.basename(source))
690 if logger
is not None:
691 logger.info(
'Target is a directory. Symlink to %s instead'%(target,))
693 os.symlink(source,target)
694 content=os.readlink(target)
696 if sync_frequently
and max_tries>1:
697 if logger
is not None:
698 msg=
"Filesystem failure (will retry): Cannot symlink \"%s\" -> \"%s\". Instead, the symlink is to \"%s\"."%(
699 target,source,content)
702 naptime=random.randrange(100,300)*1e-4
703 if logger
is not None:
704 logger.info(
'Sleep %f seconds'%(naptime,))
708 elif sync_frequently
and max_tries==1:
709 return ln_sf(source,target,content,logger=logger)
711 raise WrongSymlink(
"FILESYSTEM FAILURE: Cannot symlink \"%s\" -> \"%s\". Instead, the symlink is to \"%s\"."%(
712 target,source,content),target)
715 except EnvironmentError
as e:
716 if not e.errno==errno.EEXIST
or not force:
719 if logger
is not None:
720 logger.info(
'target exists - using replace_symlink instead')
724 """!Do not call this routine directly: you want make_symlink
725 instead. This routine creates a new symbolic link and renames
726 that link to "target." That always replaces target with a
727 symbolic link to source, even if target did not already exist.
728 @param source the file to link from
729 @param target the file to link to
730 @param logger a logging.Logger for messages"""
731 tempname=os.path.join(os.path.dirname(target),
732 'tmp.%s.%06x.%06x.tmp' % ( os.path.basename(target),
733 random.getrandbits(32),random.getrandbits(32)))
735 if logger
is not None:
736 logger.info(
'link %s -> %s'%(tempname,source))
737 os.symlink(source,tempname)
738 content=os.readlink(tempname)
740 if sync_frequently
and max_tries>1:
741 if logger
is not None:
742 msg=
"Filesystem failure (will retry): Cannot symlink \"%s\" -> \"%s\". Instead, the symlink is to \"%s\"."%(
743 tempname,source,content)
746 naptime=random.randrange(100,300)*1e-4
747 if logger
is not None:
748 logger.info(
'Sleep %f seconds'%(naptime,))
752 elif sync_frequently
and max_tries==1:
753 return ln_sf(source,target,content,logger=logger)
755 raise WrongSymlink(
"FILESYSTEM FAILURE: Cannot symlink \"%s\" -> \"%s\". Instead, the symlink is to \"%s\"."%(
756 tempname,source,content),tempname)
757 if logger
is not None:
758 logger.info(
'rename %s to %s'%(tempname,target))
759 os.rename(tempname,target)
761 except Exception
as e:
762 if isinstance(e,WrongSymlink):
raise
764 if logger
is not None:
765 logger.info(
'failed: delete %s'%(tempname,))
767 except EnvironmentError:
pass
770 def ln_sf(source,target,content,logger=None):
771 if logger
is not None:
772 msg=
"Filesystem failure (will retry with sync and ln -sf): Cannot symlink \"%s\" -> \"%s\". Instead, the symlink is to \"%s\"."%(
773 target,source,content)
779 content=os.readlink(target)
780 if content==source:
return True
782 "FILESYSTEM FAILURE: Python and ln -sf both cannot symlink \"%s\" -> \"%s\". Instead, the symlink is to \"%s\"."%(
783 target,source,content),target)
785 raise WrongSymlink(
'FILESYSTEM FAILURE: Cannot link \"%s\" -> \"%s\". Last resort of ln -sf failed: return status %s'%(
786 source,target,repr(result)))
790 """!Attempts to modify the given stream to be non-blocking. This
791 only works with streams that have an underlying POSIX fileno, such
794 Will re-raise any exception received, other than AttributeError
795 and EnvironmentError. Hence, I/O errors and attempts to make a
796 non-fileno stream non-blocking will produce a False return value,
797 while anything else will raise an exception.
799 @param stream the stream to unblock
800 @param logger a logging.Logger for log messages
801 @returns True on success, False otherwise."""
805 """!Internal function that implements unblock()
806 @param stream the stream to modify
807 @param on flags to turn on
808 @param off flags to turn off
809 @param logger a logging.Logger for messages
810 @returns True on success, False otherwise."""
812 if isinstance(stream,int):
816 except (AttributeError,EnvironmentError)
as ee:
817 if logger
is not None:
818 logger.warning(
'%s: stream has no fileno, cannot switch to '
819 'non-blocking I/O: %s'%
820 (repr(stream),str(ee)),exc_info=
True)
824 flags=fcntl.fcntl(fd, fcntl.F_GETFL)
825 fcntl.fcntl(fd, fcntl.F_SETFL, (flags|on) & ~off)
827 except EnvironmentError
as ee:
828 if logger
is not None:
829 logger.error(
'%s: cannot switch to non-blocking I/O: %s'%
830 (repr(stream),str(ee)),exc_info=
True)
834 def fortlink(forts,force=False,basedir=None,logger=None):
835 """!This is a convenience routine that makes many symbolic links to
836 fort.N files for various integers N using make_symlink. It works
837 similarly to fortcopy. The optional basedir is the relative
838 directory. The optional force argument is passed on to
839 make_symlink and has the usual meaning: replace existing files.
843 fortlink({ 15:"/usr/local/share/file1",
847 And you will create these symbolic links:
849 ./fort.15 -> /usr/local/share/file1
853 as with other symlink routines in this module, set force=True to
854 remove target fort.N files if they already exist.
855 @param forts Mapping from Fortran unit number to link target.
856 @param force Remove target files if they exist.
857 @param basedir Where to make the links instead of the current directory.
858 @param logger A logging.Logger for log messages."""
859 if logger
is not None:
860 logger.debug(
'in fortlink, forts=%s force=%s basedir=%s logger=%s'%(
861 repr(forts),repr(force),repr(basedir),repr(logger)))
862 for (i,filename)
in forts.iteritems():
863 assert(isinstance(filename,basestring))
864 link=
'fort.%d'%(int(i),)
865 if basedir
is not None: link=os.path.join(basedir,where)
868 def fortcopy(forts,basedir=None,logger=None,only_log_errors=False,**kwargs):
869 """!A convenience function for copying files to local fort.N files
870 for various integers N using deliver_file(...,keep=True). It
871 works similarly to fortlink. The force= argument tells fortcopy
872 to overwrite existing files. Otherwise, an exception will be
873 raised if the destination file already exists. The optional
874 basedir argument is the parent directory of the fort.N.
878 fortcopy({ 15:"/usr/local/share/file1",
882 And you will create files:
884 ./fort.15 (copied from /usr/local/share/file1)
885 ./fort.23 (copied from ./file2)
888 All other keyword arguments are sent to deliver_file.
889 @param forts Mapping from Fortran unit number to copy target.
890 @param basedir Where to put the files instead of the current directory.
891 @param logger A logging.Logger for log messages.
892 @param only_log_errors Only log failed operations instead of logging everything.
893 @param kwargs All other keyword arguments are passed to deliver_file()"""
894 for (i,filename)
in forts.iteritems():
895 newfile=
'fort.%d'%(int(i),)
896 if basedir
is not None: newfile=os.path.join(basedir,where)
899 except (EnvironmentError)
as ee:
900 if logger
is not None:
901 logger.warning(
'%s: fortcopy could not copy to %s: %s'
902 %(filename,newfile,str(ee)))
903 if not only_log_errors:
911 """!Normalizes path and expand home directories.
913 Calls os.path.normpath and os.path.expanduser on its argument, or
914 on os.getcwd() if no argument is supplied (or if path=None). This
915 removes extraneous a/./b, a/../b, expands ~username and ~, and
916 other system-specific expansions. See the Python documentation of
917 normpath and expanduser for details. Will also call realpath and
918 normcase if fullnorm=True. Raises RelativePathError if the
919 resulting path is not absolute.
920 @param path the path to expand
921 @param fullnorm If True, call os.path.normcase() and
922 os.path.realpath() normapth and expanduser."""
925 normpath=os.path.normpath(os.path.expanduser(path))
927 normpath=os.path.normcase(os.path.realpath(normpath))
928 if not os.path.isabs(normpath):
930 '%s: path is relative, not absolute (expands to %s)'%
935 """!Return relative path.
937 This routine generates relative file paths (using os.path.relpath)
938 that are relative to the specified "from" directory fromdir. The
939 fromdir will be first sent through norm_expand_path to eliminate
940 system-specific weirdness, such as a/./b, a/../b, ~username and so
941 on. This will raise RelativePathError if the resulting path is
944 @param fromdir the directory from which we want the relative path"""
949 """!Checks the last few bytes of a file to see if the specified
950 search string is present. Returns True if the string is present
951 or False if the file existed but the string was not present. Will
952 raise an exception if the file is non-existent or cannot be read.
954 @param filename The file to search (a string).
955 @param searchstr The string to search for. Must not contain
957 @param lastbytes The number of bytes at the end of the file to check.
958 Can be larger than the file size.
959 @param logger A logging.Logger for log messages.
960 @returns True if the file contains the given string, False otherwise"""
961 with open(str(filename),
'rt')
as f:
963 f.seek(-lastbytes,os.SEEK_END)
964 except EnvironmentError
as e:
965 if logger
is not None:
966 logger.info(
'%s: probably not an error: %s'
973 if line.find(searchstr)>=0:
975 if logger
is not None:
976 logger.info(
'%s: read %d lines'%(filename,i))
981 def check_file(filename,min_size=None,min_mtime_age=None,
982 min_atime_age=
None,min_ctime_age=
None,logger=
None):
983 """!Determines whether the specified file exists, and meets
984 additional requirements.
985 @param filename The file to analyze.
986 @param min_size If present, the file must be at least this many bytes.
987 @param min_mtime_age If specified, the file must have been modified
988 more than this many seconds in the past.
989 @param min_atime_age if specified, the file atime must be at least
990 this many seconds old. The meaning of atime varies, but
991 usually means the last access time.
992 @param min_ctime_age If specified, the file ctime must be at least
993 this many seconds old. The meaning of ctime varies between
994 platforms and file types, but usually means the file creation
995 or inode change time. See stat(2) for details.
996 @param logger a logging.Logger for log messages.
997 @note This routine can also be used on directories, but one should avoid
998 the min_size option when doing that.
999 @returns True if requirements are met, False otherwise. """
1002 if s.st_size<min_size:
1003 if logger
is not None:
1004 logger.info(
'%s: too small'%(filename,))
1006 if min_mtime_age
is not None or min_atime_age
is not None \
1007 or min_ctime_age
is not None:
1008 now=int(time.time())
1009 if min_mtime_age
is not None:
1010 if not now-s.st_mtime>min_mtime_age:
1011 if logger
is not None:
1012 logger.info(
'%s: not old enough (modification time)'
1015 if min_atime_age
is not None:
1016 if not now-s.st_atime>min_atime_age:
1017 if logger
is not None:
1018 logger.info(
'%s: not old enough (access time)'
1021 if min_ctime_age
is not None:
1022 if not now-s.st_ctime>min_ctime_age:
1023 if logger
is not None:
1024 logger.info(
'%s: not old enough (inode change time)'
1027 if logger
is not None:
1028 logger.info(
'%s: file meets requirements'%(filename,))
1030 except EnvironmentError
as e:
1031 if e.errno==errno.ENOENT:
1032 if logger
is not None:
1033 logger.info(
'%s: does not exist (ENOENT)'%(filename,))
1038 """!A class that waits for files to meet some requirements."""
1039 def __init__(self,flist=None,min_size=None,
1040 min_mtime_age=
None,min_atime_age=
None,
1043 """!Constructor for the FileWaiter. Most arguments have the same
1044 meaning as check_file()
1045 @param flist the file or list of files to wait for. This is simply
1047 @param min_size minimum file size
1048 @param min_mtime_age minimum modification time age,
1049 @param min_atime_age minimum access time age.
1050 @param min_ctime_age time since last file status change (see stat(2))
1051 @param min_fraction the minimum fraction of the provided files
1052 that must match the above requirements in order for
1053 FileWaiter.wait to return True. Default is 1.0, which
1054 means all of them."""
1063 if flist
is not None: self.
add(flist)
1080 """!Adds a file, or iterable that iterates over files, to the
1081 list of files to wait for. If the same filename is received a
1082 second time, it is ignored.
1083 @param flist a filename (string) or list of filenames"""
1084 if isinstance(flist,basestring):
1085 if flist
in self.
_fset:
1087 self._flist.append(flist)
1088 self._fset.add(flist)
1093 """!Checks to see if one file meets the requirements set in the
1094 constructor. This default implementation calls check_file.
1095 This is in a separate member function so that a subclass can
1096 override the file checking method.
1097 @returns True if the file is "ready," and False if it is not.
1098 @param filename the path to the file to check
1099 @param logger a logging.Logger for messages"""
1104 """!Resets internal information about which files have been
1109 """!Iterates over all files that were found."""
1110 for filename
in self.
_found:
yield filename
1113 """!Returns the number of files that were found."""
1116 """!Returns the number of files that were NOT found."""
1119 def checkfiles(self,maxwait=1800,sleeptime=20,logger=None,
1120 log_each_file=
True):
1121 """!Looks for the requested files. Will loop, checking over
1122 and over up to maxwait seconds, sleeping sleeptime seconds
1124 @param maxwait maximum seconds to wait
1125 @param sleeptime sleep time in seconds between checks
1126 @param logger a logging.Logger for messages
1127 @param log_each_file log messages about each file checked"""
1128 maxwait=int(maxwait)
1129 start=int(time.time())
1137 if len(self.
_fset)<=0:
1138 if logger
is not None:
1139 logger.info(
'No files to check.')
1143 now=int(time.time())
1144 nfiles=len(self.
_fset)
1146 frac=float(nfound)/nfiles
1150 logger.info(
'Have required fraction of files.')
1152 if now-start>=maxwait:
1153 logger.info(
'Waited too long. Giving up.')
1157 sleepnow=max(0,min(sleeptime,start+maxwait-now-1))
1159 logger.info(
'Waited too long. Giving up.')
1161 if logger
is not None:
1162 logger.info(
'Still need files: have %d of %d, '
1163 'but need %g%% of them (%g file%s).'
1166 's' if (needfiles>1)
else ''))
1167 logfun=logger.info
if (sleepnow>=5)
else logger.debug
1168 logfun(
'Sleeping %g seconds...'%(float(sleepnow),))
1169 time.sleep(sleepnow)
1170 if logger
is not None:
1171 logfun(
'Done sleeping.')
1175 for filename
in self.
_flist:
1176 if filename
in self.
_found:
continue
1177 if self.
check(filename,logger=flogger):
1178 self._found.add(filename)
1179 if flogger
is not None:
1180 flogger.info(
'%s: found this one (%d of %d found).'
1181 %(filename,len(self.
_found),
1187 min_size=1,min_mtime_age=30,min_atime_age=
None,
1188 min_ctime_age=
None,min_fraction=1.0,
1189 log_each_file=
True):
1190 """!Waits for files to meet requirements. This is a simple
1191 wrapper around the FileWaiter class for convenience. It is
1192 equivalent to creating a FileWaiter with the provided arguments,
1193 and calling its checkfiles routine.
1194 @param flist the file or list of files to wait for. This is simply
1196 @param logger a logging.Logger for messages
1197 @param maxwait maximum seconds to wait
1198 @param sleeptime sleep time in seconds between checks
1199 @param min_size minimum file size
1200 @param min_mtime_age minimum modification time age,
1201 @param min_atime_age minimum access time age.
1202 @param min_ctime_age time since last file status change (see stat(2))
1203 @param min_fraction the minimum fraction of the provided files
1204 that must match the above requirements in order for
1205 FileWaiter.wait to return True. Default is 1.0, which
1207 @param log_each_file log messages about each file checked """
1208 waiter=
FileWaiter(flist,min_size,min_mtime_age,min_atime_age,
1209 min_ctime_age,min_fraction)
1210 return waiter.checkfiles(maxwait,sleeptime,logger,log_each_file)
This exception is raised when the caller tries to create multiple symlinks in a single target...
def deliver_file
This moves or copies the file "infile" to "outfile" in a unit operation; outfile will never be seen i...
def norm_abs_path
Return relative path.
def netcdfver(filename)
What is the NetCDF version of this file?
min_fraction
The minimum fraction of files that must meet the requirements.
def call_fcntrl
Internal function that implements unblock()
verifyfile
The file to verify.
def __init__
Constructor for the FileWaiter.
def lstat_stat
Runs lstat and stat on a file as efficiently as possible.
This is the superclass of several exceptions relating to multi-file operations in produtil...
def touch
Open the file for append and set mtime and atime.
def remove_file
Deletes the specified file.
def unblock
Attempts to modify the given stream to be non-blocking.
def check_file
Determines whether the specified file exists, and meets additional requirements.
Thrown when find_exe is given an executable name that contains a directory path.
This exception indicates that the renamer function sent to make_symlinks_in returned an absolute path...
def symlink_read_test
Opens the specified file for reading and attempts to read data to it.
def fortcopy(forts, basedir=None, logger=None, only_log_errors=False, kwargs)
A convenience function for copying files to local fort.N files for various integers N using deliver_f...
def replace_symlink
Do not call this routine directly: you want make_symlink instead.
This exception is raised when an operation that processes multiple files catches more than one except...
message
The error message.
def use_acl_for_rstdata()
Synonym for here.use_acl_for_rstdata.
def reset(self)
Resets internal information about which files have been seen.
def gribver(filename)
What is the GRIB version of this file?
def make_symlinks_in
Creates symbolic links from a set of source files to a target directory.
message
String description of the problem.
def isnonempty(filename)
Returns True if the filename refers to an existent file that is non-empty, and False otherwise...
Thrown when find_exe cannot find an executable in the path or directory list.
def makedirs
Make a directory tree, working around filesystem bugs.
def __init__
FileOpError constructor.
def __init__(self, message, fromfile, tofile, verifyfile)
VerificationFailed constructor.
Provides information about the cluster on which this job is running.
def norm_expand_path
Normalizes path and expand home directories.
def fortlink
This is a convenience routine that makes many symbolic links to fort.N files for various integers N u...
This exception is raised when a copy of a file has different content than the original.
Raised when os.symlink makes a symlink to a target other than the one that was requested.
def __init__(self, message, fromfile, tofile)
DeliveryFailed constructor.
def countmissing(self)
Returns the number of files that were NOT found.
def __repr__(self)
Pythonic representation of this error.
This exception is raised when a file cannot be delivered.
more
A list of tuples, (from,to,message) where from is the source file, to is the destination file and mes...
Thrown when a find_exe fails.
def __str__(self)
Human-readable description of this error.
def __str__(self)
A string description of the problem.
Raised when a relative path is given, but an absolute path is expected.
def wait_for_files
Waits for files to meet requirements.
def check
Checks to see if one file meets the requirements set in the constructor.
def __iter__(self)
Iterates over a list of tuples, (from,to,message) where from is the source file, to is the destinatio...
def countfound(self)
Returns the number of files that were found.
min_size
The minimum file size.
def check_last_lines
Checks the last few bytes of a file to see if the specified search string is present.
def group_quotas()
Synonym for here.group_quotas.
min_atime_age
Minimum age of the access time.
filename
The name of the problematic file.
def checkfiles
Looks for the requested files.
def realcwd()
Returns the current working directory, expanding any symbolic links.
def chdir
Changes to the specified directory.
def find_exe
Searches the $PATH or a specified iterable of directory names to find an executable file with the giv...
def rmall(args, kwargs)
Deletes the specified list of files.
A class that waits for files to meet some requirements.
Internal module that launches and monitors processes.
def __str__(self)
Human-readable description of this error.
def add(self, flist)
Adds a file, or iterable that iterates over files, to the list of files to wait for.
def __repr__(self)
Pythonic representation of this error.
def iterfound(self)
Iterates over all files that were found.
min_mtime_age
Minimum age of the modification time.
min_ctime_age
Minimum age of the creation and/or inode access time.
def make_symlink
Creates a symbolic link "target" that points to "source".