📄 tarfile.py
字号:
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError, "fifo not supported by system"
def makedev(self, tarinfo, targetpath):
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError, "special devices not supported by system"
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
# XXX This if statement should go away when
# python-2.3a0-devicemacros patch succeeds.
if hasattr(os, "makedev"):
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
else:
os.mknod(targetpath, mode,
tarinfo.devmajor, tarinfo.devminor)
def makelink(self, tarinfo, targetpath):
linkpath = tarinfo.linkname
try:
if tarinfo.issym():
os.symlink(linkpath, targetpath)
else:
os.link(linkpath, targetpath)
except AttributeError:
if tarinfo.issym():
linkpath = os.path.join(os.path.dirname(tarinfo.name),
linkpath)
linkpath = normpath(linkpath)
try:
self._extract_member(self.getmember(linkpath), targetpath)
except (EnvironmentError, KeyError), e:
linkpath = os.path.normpath(linkpath)
try:
shutil.copy2(linkpath, targetpath)
except EnvironmentError, e:
raise IOError, "link could not be created"
def chown(self, tarinfo, targetpath):
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
try:
g = grp.getgrgid(tarinfo.gid)[2]
except KeyError:
g = os.getgid()
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
try:
u = pwd.getpwuid(tarinfo.uid)[2]
except KeyError:
u = os.getuid()
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
os.chown(targetpath, u, g)
except EnvironmentError, e:
raise ExtractError, "could not change owner"
def chmod(self, tarinfo, targetpath):
try:
os.chmod(targetpath, tarinfo.mode)
except EnvironmentError, e:
raise ExtractError, "could not change mode"
def utime(self, tarinfo, targetpath):
if sys.platform == "win32" and tarinfo.isdir():
# According to msdn.microsoft.com, it is an error (EACCES)
# to use utime() on directories.
return
try:
os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime))
except EnvironmentError, e:
raise ExtractError, "could not change modification time"
#--------------------------------------------------------------------------
def next(self):
self._check("ra")
if self.firstmember is not None:
m = self.firstmember
self.firstmember = None
return m
# Read the next block.
self.fileobj.seek(self.chunks[-1])
while True:
buf = self.fileobj.read(BLOCKSIZE)
if not buf:
return None
try:
tarinfo = TarInfo.frombuf(buf)
except ValueError:
if self.ignore_zeros:
if buf.count(NUL) == BLOCKSIZE:
adj = "empty"
else:
adj = "invalid"
self._dbg(2, "0x%X: %s block" % (self.offset, adj))
self.offset += BLOCKSIZE
continue
else:
# Block is empty or unreadable.
if self.chunks[-1] == 0:
# If the first block is invalid. That does not
# look like a tar archive we can handle.
raise ReadError,"empty, unreadable or compressed file"
return None
break
# We shouldn't rely on this checksum, because some tar programs
# calculate it differently and it is merely validating the
# header block. We could just as well skip this part, which would
# have a slight effect on performance...
if tarinfo.chksum != calc_chksum(buf):
self._dbg(1, "tarfile: Bad Checksum %r" % tarinfo.name)
# Set the TarInfo object's offset to the current position of the
# TarFile and set self.offset to the position where the data blocks
# should begin.
tarinfo.offset = self.offset
self.offset += BLOCKSIZE
# Check if the TarInfo object has a typeflag for which a callback
# method is registered in the TYPE_METH. If so, then call it.
if tarinfo.type in self.TYPE_METH:
tarinfo = self.TYPE_METH[tarinfo.type](self, tarinfo)
else:
tarinfo.offset_data = self.offset
if tarinfo.isreg() or tarinfo.type not in SUPPORTED_TYPES:
# Skip the following data blocks.
self.offset += self._block(tarinfo.size)
if tarinfo.isreg() and tarinfo.name[:-1] == "/":
# some old tar programs don't know DIRTYPE
tarinfo.type = DIRTYPE
self.members.append(tarinfo)
self.membernames.append(tarinfo.name)
self.chunks.append(self.offset)
return tarinfo
#--------------------------------------------------------------------------
# Below are some methods which are called for special typeflags in the
# next() method, e.g. for unwrapping GNU longname/longlink blocks. They
# are registered in TYPE_METH below. You can register your own methods
# with this mapping.
# A registered method is called with a TarInfo object as only argument.
#
# During its execution the method MUST perform the following tasks:
# 1. set tarinfo.offset_data to the position where the data blocks begin,
# if there is data to follow.
# 2. set self.offset to the position where the next member's header will
# begin.
# 3. return a valid TarInfo object.
def proc_gnulong(self, tarinfo):
buf = ""
name = None
linkname = None
count = tarinfo.size
while count > 0:
block = self.fileobj.read(BLOCKSIZE)
buf += block
self.offset += BLOCKSIZE
count -= BLOCKSIZE
if tarinfo.type == GNUTYPE_LONGNAME:
name = nts(buf)
if tarinfo.type == GNUTYPE_LONGLINK:
linkname = nts(buf)
buf = self.fileobj.read(BLOCKSIZE)
tarinfo = TarInfo.frombuf(buf)
tarinfo.offset = self.offset
self.offset += BLOCKSIZE
tarinfo.offset_data = self.offset
tarinfo.name = name or tarinfo.name
tarinfo.linkname = linkname or tarinfo.linkname
if tarinfo.isreg() or tarinfo.type not in SUPPORTED_TYPES:
# Skip the following data blocks.
self.offset += self._block(tarinfo.size)
return tarinfo
def proc_sparse(self, tarinfo):
buf = tarinfo.tobuf()
sp = _ringbuffer()
pos = 386
lastpos = 0L
realpos = 0L
# There are 4 possible sparse structs in the
# first header.
for i in xrange(4):
try:
offset = int(buf[pos:pos + 12], 8)
numbytes = int(buf[pos + 12:pos + 24], 8)
except ValueError:
break
if offset > lastpos:
sp.append(_hole(lastpos, offset - lastpos))
sp.append(_data(offset, numbytes, realpos))
realpos += numbytes
lastpos = offset + numbytes
pos += 24
isextended = ord(buf[482])
origsize = int(buf[483:495], 8)
# If the isextended flag is given,
# there are extra headers to process.
while isextended == 1:
buf = self.fileobj.read(BLOCKSIZE)
self.offset += BLOCKSIZE
pos = 0
for i in xrange(21):
try:
offset = int(buf[pos:pos + 12], 8)
numbytes = int(buf[pos + 12:pos + 24], 8)
except ValueError:
break
if offset > lastpos:
sp.append(_hole(lastpos, offset - lastpos))
sp.append(_data(offset, numbytes, realpos))
realpos += numbytes
lastpos = offset + numbytes
pos += 24
isextended = ord(buf[504])
if lastpos < origsize:
sp.append(_hole(lastpos, origsize - lastpos))
tarinfo.sparse = sp
tarinfo.offset_data = self.offset
self.offset += self._block(tarinfo.size)
tarinfo.size = origsize
return tarinfo
# The type mapping for the next() method. The keys are single character
# strings, the typeflag. The values are methods which are called when
# next() encounters such a typeflag.
TYPE_METH = {
GNUTYPE_LONGNAME: proc_gnulong,
GNUTYPE_LONGLINK: proc_gnulong,
GNUTYPE_SPARSE: proc_sparse
}
#--------------------------------------------------------------------------
# Little helper methods:
def _block(self, count):
blocks, remainder = divmod(count, BLOCKSIZE)
if remainder:
blocks += 1
return blocks * BLOCKSIZE
def _getmember(self, name, tarinfo=None):
if tarinfo is None:
end = len(self.members)
else:
end = self.members.index(tarinfo)
for i in xrange(end - 1, -1, -1):
if name == self.membernames[i]:
return self.members[i]
def _load(self):
while True:
tarinfo = self.next()
if tarinfo is None:
break
self._loaded = True
def _check(self, mode=None):
if self.closed:
raise IOError, "%s is closed" % self.__class__.__name__
if mode is not None and self._mode not in mode:
raise IOError, "bad operation for mode %r" % self._mode
def __iter__(self):
if self._loaded:
return iter(self.members)
else:
return TarIter(self)
def _create_gnulong(self, name, type):
tarinfo = TarInfo()
tarinfo.name = "././@LongLink"
tarinfo.type = type
tarinfo.mode = 0
tarinfo.size = len(name)
# write extended header
self.fileobj.write(tarinfo.tobuf())
# write name blocks
self.fileobj.write(name)
blocks, remainder = divmod(tarinfo.size, BLOCKSIZE)
if remainder > 0:
self.fileobj.write(NUL * (BLOCKSIZE - remainder))
blocks += 1
self.offset += blocks * BLOCKSIZE
def _dbg(self, level, msg):
if level <= self.debug:
print >> sys.stderr, msg
# class TarFile
class TarIter:
def __init__(self, tarfile):
self.tarfile = tarfile
def __iter__(self):
return self
def next(self):
tarinfo = self.tarfile.next()
if not tarinfo:
self.tarfile._loaded = True
raise StopIteration
return tarinfo
# Helper classes for sparse file support
class _section:
def __init__(self, offset, size):
self.offset = offset
self.size = size
def __contains__(self, offset):
return self.offset <= offset < self.offset + self.size
class _data(_section):
def __init__(self, offset, size, realpos):
_section.__init__(self, offset, size)
self.realpos = realpos
class _hole(_section):
pass
class _ringbuffer(list):
def __init__(self):
self.idx = 0
def find(self, offset):
idx = self.idx
while True:
item = self[idx]
if offset in item:
break
idx += 1
if idx == len(self):
idx = 0
if idx == self.idx:
# End of File
return None
self.idx = idx
return item
#---------------------------------------------
# zipfile compatible TarFile class
#---------------------------------------------
TAR_PLAIN = 0 # zipfile.ZIP_STORED
TAR_GZIPPED = 8 # zipfile.ZIP_DEFLATED
class TarFileCompat:
def __init__(self, file, mode="r", compression=TAR_PLAIN):
if compression == TAR_PLAIN:
self.tarfile = TarFile.taropen(file, mode)
elif compression == TAR_GZIPPED:
self.tarfile = TarFile.gzopen(file, mode)
else:
raise ValueError, "unknown compression constant"
if mode[0:1] == "r":
members = self.tarfile.getmembers()
for i in xrange(len(members)):
m = members[i]
m.filename = m.name
m.file_size = m.size
m.date_time = time.gmtime(m.mtime)[:6]
def namelist(self):
return map(lambda m: m.name, self.infolist())
def infolist(self):
return filter(lambda m: m.type in REGULAR_TYPES,
self.tarfile.getmembers())
def printdir(self):
self.tarfile.list()
def testzip(self):
return
def getinfo(self, name):
return self.tarfile.getmember(name)
def read(self, name):
return self.tarfile.extractfile(self.tarfile.getmember(name)).read()
def write(self, filename, arcname=None, compress_type=None):
self.tarfile.add(filename, arcname)
def writestr(self, zinfo, bytes):
import StringIO
import calendar
zinfo.name = zinfo.filename
zinfo.size = zinfo.file_size
zinfo.mtime = calendar.timegm(zinfo.date_time)
self.tarfile.addfile(zinfo, StringIO.StringIO(bytes))
def close(self):
self.tarfile.close()
#class TarFileCompat
#--------------------
# exported functions
#--------------------
def is_tarfile(name):
try:
t = open(name)
t.close()
return True
except TarError:
return False
open = TarFile.open
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -