📄 tarfile.py
字号:
try:
t = cls.taropen(tarname, mode,
gzip.GzipFile(name, mode, compresslevel, fileobj)
)
except IOError:
raise ReadError, "not a gzip file"
t._extfileobj = False
return t
gzopen = classmethod(gzopen)
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9):
if len(mode) > 1 or mode not in "rw":
raise ValueError, "mode must be 'r' or 'w'."
try:
import bz2
except ImportError:
raise CompressionError, "bz2 module is not available"
pre, ext = os.path.splitext(name)
pre = os.path.basename(pre)
if ext == ".tbz2":
ext = ".tar"
if ext == ".bz2":
ext = ""
tarname = pre + ext
if fileobj is not None:
raise ValueError, "no support for external file objects"
try:
t = cls.taropen(tarname, mode, bz2.BZ2File(name, mode, compresslevel=compresslevel))
except IOError:
raise ReadError, "not a bzip2 file"
t._extfileobj = False
return t
bz2open = classmethod(bz2open)
# All *open() methods are registered here.
OPEN_METH = {
"tar": "taropen", # uncompressed tar
"gz": "gzopen", # gzip compressed tar
"bz2": "bz2open" # bzip2 compressed tar
}
#--------------------------------------------------------------------------
# The public methods which TarFile provides:
def close(self):
if self.closed:
return
if self._mode in "aw":
self.fileobj.write(NUL * (BLOCKSIZE * 2))
self.offset += (BLOCKSIZE * 2)
# fill up the end with zero-blocks
# (like option -b20 for tar does)
blocks, remainder = divmod(self.offset, RECORDSIZE)
if remainder > 0:
self.fileobj.write(NUL * (RECORDSIZE - remainder))
if not self._extfileobj:
self.fileobj.close()
self.closed = True
def getmember(self, name):
self._check()
if name not in self.membernames and not self._loaded:
self._load()
if name not in self.membernames:
raise KeyError, "filename %r not found" % name
return self._getmember(name)
def getmembers(self):
self._check()
if not self._loaded: # if we want to obtain a list of
self._load() # all members, we first have to
# scan the whole archive.
return self.members
def getnames(self):
self._check()
if not self._loaded:
self._load()
return self.membernames
def gettarinfo(self, name=None, arcname=None, fileobj=None):
self._check("aw")
# When fileobj is given, replace name by
# fileobj's real name.
if fileobj is not None:
name = fileobj.name
# Building the name of the member in the archive.
# Backward slashes are converted to forward slashes,
# Absolute paths are turned to relative paths.
if arcname is None:
arcname = name
arcname = normpath(arcname)
drv, arcname = os.path.splitdrive(arcname)
while arcname[0:1] == "/":
arcname = arcname[1:]
# Now, fill the TarInfo object with
# information specific for the file.
tarinfo = TarInfo()
# Use os.stat or os.lstat, depending on platform
# and if symlinks shall be resolved.
if fileobj is None:
if hasattr(os, "lstat") and not self.dereference:
statres = os.lstat(name)
else:
statres = os.stat(name)
else:
statres = os.fstat(fileobj.fileno())
linkname = ""
stmd = statres.st_mode
if stat.S_ISREG(stmd):
inode = (statres.st_ino, statres.st_dev)
if inode in self.inodes and not self.dereference:
# Is it a hardlink to an already
# archived file?
type = LNKTYPE
linkname = self.inodes[inode]
else:
# The inode is added only if its valid.
# For win32 it is always 0.
type = REGTYPE
if inode[0]:
self.inodes[inode] = arcname
elif stat.S_ISDIR(stmd):
type = DIRTYPE
if arcname[-1:] != "/":
arcname += "/"
elif stat.S_ISFIFO(stmd):
type = FIFOTYPE
elif stat.S_ISLNK(stmd):
type = SYMTYPE
linkname = os.readlink(name)
elif stat.S_ISCHR(stmd):
type = CHRTYPE
elif stat.S_ISBLK(stmd):
type = BLKTYPE
else:
return None
# Fill the TarInfo object with all
# information we can get.
tarinfo.name = arcname
tarinfo.mode = stmd
tarinfo.uid = statres.st_uid
tarinfo.gid = statres.st_gid
tarinfo.size = statres.st_size
tarinfo.mtime = statres.st_mtime
tarinfo.type = type
tarinfo.linkname = linkname
if pwd:
try:
tarinfo.uname = pwd.getpwuid(tarinfo.uid)[0]
except KeyError:
pass
if grp:
try:
tarinfo.gname = grp.getgrgid(tarinfo.gid)[0]
except KeyError:
pass
if type in (CHRTYPE, BLKTYPE):
if hasattr(os, "major") and hasattr(os, "minor"):
tarinfo.devmajor = os.major(statres.st_rdev)
tarinfo.devminor = os.minor(statres.st_rdev)
return tarinfo
def list(self, verbose=True):
self._check()
for tarinfo in self:
if verbose:
print filemode(tarinfo.mode),
print "%s/%s" % (tarinfo.uname or tarinfo.uid,
tarinfo.gname or tarinfo.gid),
if tarinfo.ischr() or tarinfo.isblk():
print "%10s" % ("%d,%d" \
% (tarinfo.devmajor, tarinfo.devminor)),
else:
print "%10d" % tarinfo.size,
print "%d-%02d-%02d %02d:%02d:%02d" \
% time.localtime(tarinfo.mtime)[:6],
print tarinfo.name,
if verbose:
if tarinfo.issym():
print "->", tarinfo.linkname,
if tarinfo.islnk():
print "link to", tarinfo.linkname,
print
def add(self, name, arcname=None, recursive=True):
self._check("aw")
if arcname is None:
arcname = name
# Skip if somebody tries to archive the archive...
if self.name is not None \
and os.path.abspath(name) == os.path.abspath(self.name):
self._dbg(2, "tarfile: Skipped %r" % name)
return
# Special case: The user wants to add the current
# working directory.
if name == ".":
if recursive:
if arcname == ".":
arcname = ""
for f in os.listdir("."):
self.add(f, os.path.join(arcname, f))
return
self._dbg(1, name)
# Create a TarInfo object from the file.
tarinfo = self.gettarinfo(name, arcname)
if tarinfo is None:
self._dbg(1, "tarfile: Unsupported type %r" % name)
return
# Append the tar header and data to the archive.
if tarinfo.isreg():
f = file(name, "rb")
self.addfile(tarinfo, f)
f.close()
if tarinfo.type in (LNKTYPE, SYMTYPE, FIFOTYPE, CHRTYPE, BLKTYPE):
tarinfo.size = 0L
self.addfile(tarinfo)
if tarinfo.isdir():
self.addfile(tarinfo)
if recursive:
for f in os.listdir(name):
self.add(os.path.join(name, f), os.path.join(arcname, f))
def addfile(self, tarinfo, fileobj=None):
self._check("aw")
tarinfo.name = normpath(tarinfo.name)
if tarinfo.isdir():
tarinfo.name += "/"
if tarinfo.linkname:
tarinfo.linkname = normpath(tarinfo.linkname)
if tarinfo.size > MAXSIZE_MEMBER:
raise ValueError, "file is too large (>8GB)"
if len(tarinfo.linkname) > LENGTH_LINK:
if self.posix:
raise ValueError, "linkname is too long (>%d)" \
% (LENGTH_LINK)
else:
self._create_gnulong(tarinfo.linkname, GNUTYPE_LONGLINK)
tarinfo.linkname = tarinfo.linkname[:LENGTH_LINK -1]
self._dbg(2, "tarfile: Created GNU tar extension LONGLINK")
if len(tarinfo.name) > LENGTH_NAME:
if self.posix:
prefix = tarinfo.name[:LENGTH_PREFIX + 1]
while prefix and prefix[-1] != "/":
prefix = prefix[:-1]
name = tarinfo.name[len(prefix):]
prefix = prefix[:-1]
if not prefix or len(name) > LENGTH_NAME:
raise ValueError, "name is too long (>%d)" \
% (LENGTH_NAME)
tarinfo.name = name
tarinfo.prefix = prefix
else:
self._create_gnulong(tarinfo.name, GNUTYPE_LONGNAME)
tarinfo.name = tarinfo.name[:LENGTH_NAME - 1]
self._dbg(2, "tarfile: Created GNU tar extension LONGNAME")
self.fileobj.write(tarinfo.tobuf())
self.offset += BLOCKSIZE
# If there's data to follow, append it.
if fileobj is not None:
copyfileobj(fileobj, self.fileobj, tarinfo.size)
blocks, remainder = divmod(tarinfo.size, BLOCKSIZE)
if remainder > 0:
self.fileobj.write(NUL * (BLOCKSIZE - remainder))
blocks += 1
self.offset += blocks * BLOCKSIZE
self.members.append(tarinfo)
self.membernames.append(tarinfo.name)
self.chunks.append(self.offset)
def extract(self, member, path=""):
self._check("r")
if isinstance(member, TarInfo):
tarinfo = member
else:
tarinfo = self.getmember(member)
try:
self._extract_member(tarinfo, os.path.join(path, tarinfo.name))
except EnvironmentError, e:
if self.errorlevel > 0:
raise
else:
if e.filename is None:
self._dbg(1, "tarfile: %s" % e.strerror)
else:
self._dbg(1, "tarfile: %s %r" % (e.strerror, e.filename))
except ExtractError, e:
if self.errorlevel > 1:
raise
else:
self._dbg(1, "tarfile: %s" % e)
def extractfile(self, member):
self._check("r")
if isinstance(member, TarInfo):
tarinfo = member
else:
tarinfo = self.getmember(member)
if tarinfo.isreg():
return self.fileobject(self, tarinfo)
elif tarinfo.type not in SUPPORTED_TYPES:
# If a member's type is unknown, it is treated as a
# regular file.
return self.fileobject(self, tarinfo)
elif tarinfo.islnk() or tarinfo.issym():
if isinstance(self.fileobj, _Stream):
# A small but ugly workaround for the case that someone tries
# to extract a (sym)link as a file-object from a non-seekable
# stream of tar blocks.
raise StreamError, "cannot extract (sym)link as file object"
else:
# A (sym)link's file object is it's target's file object.
return self.extractfile(self._getmember(tarinfo.linkname,
tarinfo))
else:
# If there's no data associated with the member (directory, chrdev,
# blkdev, etc.), return None instead of a file object.
return None
def _extract_member(self, tarinfo, targetpath):
# Fetch the TarInfo object for the given name
# and build the destination pathname, replacing
# forward slashes to platform specific separators.
if targetpath[-1:] == "/":
targetpath = targetpath[:-1]
targetpath = os.path.normpath(targetpath)
# Create all upper directories.
upperdirs = os.path.dirname(targetpath)
if upperdirs and not os.path.exists(upperdirs):
ti = TarInfo()
ti.name = upperdirs
ti.type = DIRTYPE
ti.mode = 0777
ti.mtime = tarinfo.mtime
ti.uid = tarinfo.uid
ti.gid = tarinfo.gid
ti.uname = tarinfo.uname
ti.gname = tarinfo.gname
try:
self._extract_member(ti, ti.name)
except:
pass
if tarinfo.islnk() or tarinfo.issym():
self._dbg(1, "%s -> %s" % (tarinfo.name, tarinfo.linkname))
else:
self._dbg(1, tarinfo.name)
if tarinfo.isreg():
self.makefile(tarinfo, targetpath)
elif tarinfo.isdir():
self.makedir(tarinfo, targetpath)
elif tarinfo.isfifo():
self.makefifo(tarinfo, targetpath)
elif tarinfo.ischr() or tarinfo.isblk():
self.makedev(tarinfo, targetpath)
elif tarinfo.islnk() or tarinfo.issym():
self.makelink(tarinfo, targetpath)
elif tarinfo.type not in SUPPORTED_TYPES:
self.makeunknown(tarinfo, targetpath)
else:
self.makefile(tarinfo, targetpath)
self.chown(tarinfo, targetpath)
if not tarinfo.issym():
self.chmod(tarinfo, targetpath)
self.utime(tarinfo, targetpath)
#--------------------------------------------------------------------------
# Below are the different file methods. They are called via
# _extract_member() when extract() is called. They can be replaced in a
# subclass to implement other functionality.
def makedir(self, tarinfo, targetpath):
try:
os.mkdir(targetpath)
except EnvironmentError, e:
if e.errno != errno.EEXIST:
raise
def makefile(self, tarinfo, targetpath):
source = self.extractfile(tarinfo)
target = file(targetpath, "wb")
copyfileobj(source, target)
source.close()
target.close()
def makeunknown(self, tarinfo, targetpath):
self.makefile(tarinfo, targetpath)
self._dbg(1, "tarfile: Unknown file type %r, " \
"extracted as regular file." % tarinfo.type)
def makefifo(self, tarinfo, targetpath):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -